Upgrading from hyper v0 to hyper v1 still using the `legacy::Client`

I wrote a simple S3 client with axum v0.6 and hyper v0 using the hyper's (now deprecated/legacy Client).

I don't want to use reqwest or others because I need very simple and not complex features.

Now I need to update to axum v0.7 and hyper v1: I'm a bit lost.

This is the code that works with hyper v0 | The same on Rust Playground

This is the code that doesn't work with hyper v1 | The same on Rust Playground

Since I'm losing my mind, actually, I've already lost it, can you please help me understand how to change it for the new hyper version?

The most important thing for me is that the file is streamed and not copied into memory, both for uploading and downloading.

The code with both v0 (commented) and v1 (the second link above):

/*
[dependencies]
axum = { version = "0.7.4", default-features = false }
bytes = { version = "1.5.0", default-features = false }
tokio = { version = "1.0", features = ["full"] }
futures = { version = "0.3.30", default-features = false, features = ["std"] }
hyper = { version = "1.1.0", features = ["full"] }
hyper-tls = { version = "0.6.0", default-features = false }
hyper-util = { version = "0.1.2", features = [
    "client",
    "client-legacy",
    "http1",
    "http2",
] }
http-body-util = { version = "0.1.0", default-features = false }
tokio-util = { version = "0.7.10", default-features = false, features = [
    "compat",
    "io",
] }
*/

use futures::TryStreamExt;
use http_body_util::{combinators::BoxBody, BodyExt, BodyStream, Full, StreamBody};
use hyper::{
    body::{Frame, Incoming},
    header, Method,
};
use hyper_tls::HttpsConnector;
use hyper_util::{
    client::legacy::{self, connect::HttpConnector},
    rt::TokioExecutor,
};
use std::{error::Error, pin::Pin, time::Duration};
use tokio::io::AsyncRead;
use tokio_util::{
    bytes::Bytes,
    compat::FuturesAsyncReadCompatExt,
    io::{ReaderStream, StreamReader},
};

pub struct S3Client {
    credentials: String,
    hyper_client: legacy::Client<HttpsConnector<HttpConnector>, BoxBody<Bytes, std::io::Error>>,
}

impl S3Client {
    pub fn new(credentials: &str) -> Result<Self, Box<dyn Error>> {
        // With hyper v0
        // let hyper_client = hyper::Client::builder().build::<_, hyper::Body>(HttpsConnector::new());

        let hyper_client =
            legacy::Client::builder(TokioExecutor::new())
                .build::<_, BoxBody<Bytes, std::io::Error>>(HttpsConnector::new());

        Ok(Self {
            credentials: credentials.to_string(),
            hyper_client,
        })
    }

    pub async fn put_file(
        &self,
        size: i64,
        reader: Pin<Box<(dyn AsyncRead + Send + Sync)>>,
    ) -> Result<(), Box<dyn Error>> {
        self.hyper_client
            .request(
                hyper::Request::builder()
                    .method(Method::PUT)
                    .uri(&self.credentials)
                    .header(header::CONTENT_LENGTH, size)
                    // With hyper v0
                    // .body(hyper::Body::wrap_stream(ReaderStream::new(reader)))
                    .body(StreamBody::new(ReaderStream::new(reader).map_ok(Frame::data)).boxed())
                    .unwrap(),
            )
            .await?;

        Ok(())
    }

    pub async fn get_file(
        &self,
        id: &str,
    ) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>, Box<dyn Error>> {
        let response = self
            .hyper_client
            .get((format!("{}/{id}", self.credentials)).parse()?)
            .await?
            .into_body()
            // With hyper v0
            // .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
            // .into_async_read()
            // .compat()
            ;

        let result = StreamReader::new(BodyStream::new(response));

        // With hyper v0
        // Ok(Box::pin(response))

        Ok(result)
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let s3_client = S3Client::new("something here")?;

    // I have a "request: axum::extract::Request," arg now in axum's v0.7 handler

    // let stream = request
    //     .into_body()
    //     .into_data_stream()
    //     .map_err(|err| io::Error::new(io::ErrorKind::Other, err));

    // s3_client
    //     .put_file(123, Box::pin(StreamReader::new(stream)))
    //     .await?;

    Ok(())
}

I got it working like so:

    pub async fn put_file(
        &self,
        size: i64,
        reader: Pin<Box<(dyn AsyncRead + Send + Sync)>>,
    ) -> Result<(), Box<dyn Error>> {
        self.hyper_client
            .request(
                hyper::Request::builder()
                    .method(Method::PUT)
                    .uri(&self.credentials)
                    .header(header::CONTENT_LENGTH, size)
                    .body(BodyExt::boxed(StreamBody::new(
                        ReaderStream::new(reader).map_ok(Frame::data),
                    )))
                    .unwrap(),
            )
            .await?;

        Ok(())
    }

    pub async fn get_file(
        &self,
        id: &str,
    ) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>, Box<dyn Error>> {
        let body = self
            .hyper_client
            .get((format!("{}/{id}", self.credentials)).parse()?)
            .await?
            .into_body();

        let body = BodyStream::new(body).filter_map(|res| async {
            Some(match res {
                Ok(frame) => Ok(frame.into_data().ok()?),
                Err(e) => Err(io::Error::other(e)),
            })
        });

        Ok(Box::pin(StreamReader::new(body)))
    }
1 Like

Wow, as usual you are very fast, very precise and very good!

I'm slowly understanding where I was going wrong.

Thank you!!!

Now I'm having difficulties with an axum middleware which wraps EVERY call in my backend:

The code is very simple, yet I cannot understand how to create and how to use the new hyper v1 types (legacy::Client included):

/*
[dependencies]
axum = { version = "0.7.4" }
tokio = { version = "1.0", features = ["full"] }
hyper = { version = "1.1.0", features = ["full"] }
hyper-util = { version = "0.1.2", features = [
    "client",
    "client-legacy",
    "http1",
] }
*/

use std::sync::Arc;

use axum::{
    body::Body,
    extract::{Request, State},
    http::StatusCode,
    middleware::{self, Next},
    response::{Html, IntoResponse, Response},
    routing::get,
    Router,
};
use hyper::body::Incoming;
use hyper_util::{
    client::legacy::{self, connect::HttpConnector},
    rt::TokioExecutor,
};

pub struct AppState {
    pub hyper_client: legacy::Client<HttpConnector, Incoming>,
}

#[tokio::main]
async fn main() {
    let hyper_client = legacy::Client::builder(TokioExecutor::new()).build_http();

    let app_state = Arc::new(AppState { hyper_client });

    let app = Router::new()
        .route("/", get(handler))
        .route_layer(middleware::from_fn_with_state(app_state, check));

    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
        .await
        .unwrap();

    axum::serve(listener, app.with_state(app_state).into_make_service())
        .await
        .unwrap();
}

async fn handler() -> Html<&'static str> {
    Html("<h1>Hello, World!</h1>")
}

async fn check(
    State(state): State<Arc<AppState>>,
    mut req: Request<Body>,
    next: Next,
) -> Result<Response, StatusCode> {
    let cookie = req
        .headers()
        .get("Cookie")
        .and_then(|c| c.to_str().ok())
        .unwrap_or("");

    // I don't know what type to use for the client, I also tried with:
    // let client = legacy::Client::builder(TokioExecutor::new())
    //     // .build::<_, BoxBody<Bytes, std::io::Error>>(HttpsConnector::new())(
    //     // .build::<_, Full<Bytes>>(HttpConnector::new());
    //     // .build::<_, BoxBody(HttpConnector::new());
    //     .build::<_, String>(HttpConnector::new());

    let response = state
        .hyper_client
        .request(
            // I don't know what type to use here
            Request::builder()
                .uri("/check")
                .header("Cookie", cookie)
                // .body(Empty::new())
                // .body(BoxBody::new(String::new()))
                .body(String::new())
                .unwrap(),
        )
        .await;

    if let Ok(resp) = response {
        // The response from /check endpoint is a json one, but how to get that json here?
        // let response_as_json = deserialize_json(resp.into_body())?;

        // And to retrieve the response as string?
        let response_as_string = resp.into_string();

        if response_as_string == "i_am_the_winner_cookie" {
            req.extensions_mut().insert(response_as_string);

            return Ok(next.run(req).await);
        }
    }

    Err(StatusCode::UNAUTHORIZED)
}

I think I somehow managed to fix it, except for the final transformation into JSON:

/*
[dependencies]
axum = { version = "0.7.4" }
tokio = { version = "1.0", features = ["full"] }
hyper = { version = "1.1.0", features = ["full"] }
hyper-util = { version = "0.1.2", features = [
    "client",
    "client-legacy",
    "http1",
] }
http-body-util = { version = "0.1.0" }
nanoserde = { version = "0.1.35" }
*/

use axum::{
    body::Body,
    extract::{Request, State},
    http::StatusCode,
    middleware::{self, Next},
    response::{Html, Response},
    routing::get,
    Router,
};
use http_body_util::BodyExt;
use hyper_util::{
    client::legacy::{self, connect::HttpConnector},
    rt::TokioExecutor,
};
use nanoserde::{DeBin, DeJson};
use std::sync::Arc;

pub struct AppState {
    pub hyper_client: legacy::Client<HttpConnector, Body>,
}

#[derive(nanoserde::DeBin, nanoserde::DeJson, Clone)]
pub struct ResponseAsJson {
    id: String,
    name: String,
}

#[tokio::main]
async fn main() {
    let hyper_client = legacy::Client::builder(TokioExecutor::new()).build_http();

    let app_state = Arc::new(AppState { hyper_client });

    let app = Router::new()
        .route("/", get(handler))
        .route_layer(middleware::from_fn_with_state(app_state.clone(), check));

    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
        .await
        .unwrap();

    axum::serve(listener, app.with_state(app_state).into_make_service())
        .await
        .unwrap();
}

async fn handler() -> Html<&'static str> {
    Html("<h1>Hello, World!</h1>")
}

async fn check(
    State(state): State<Arc<AppState>>,
    mut req: Request<Body>,
    next: Next,
) -> Result<Response, StatusCode> {
    let cookie = req
        .headers()
        .get("Cookie")
        .and_then(|c| c.to_str().ok())
        .unwrap_or("");

    let response = state
        .hyper_client
        .request(
            Request::builder()
                .uri("/check")
                .header("Cookie", cookie)
                .body(Body::empty())
                .unwrap(),
        )
        .await;

    if let Ok(resp) = response {
        // How to retrieve the JSON here?
        let response_as_json = ResponseAsJson::deserialize_bin(resp);

        if response_as_json.is_ok() {
            req.extensions_mut().insert(response_as_json);

            return Ok(next.run(req).await);
        }
    }

    Err(StatusCode::UNAUTHORIZED)
}

How can I fix that?

You just need to take the body, and then use the BodyExt::collect method to convert it all into a contiguous byte sequence.

        if let Ok(body) = resp.into_body().collect().await {
            let response_as_json = ResponseAsJson::deserialize_bin(&body.to_bytes());

            if response_as_json.is_ok() {
                req.extensions_mut().insert(response_as_json);

                return Ok(next.run(req).await);
            }
        }
1 Like

@SabrinaJewson I think this is not correct if I change from:

reader: Pin<Box<(dyn AsyncRead + Send + Sync)>>,

to:

reader: Pin<Box<(dyn AsyncRead + Send)>>,

because of: Why with axum v0.7 the stream is no `Sync` anymore? · tokio-rs/axum · Discussion #2540 · GitHub.

How can I fix the error now?

The code and the error: undefined | Rust Explorer.

The error now is:

error[E0277]: `dyn tokio::io::AsyncRead + std::marker::Send` cannot be shared between threads safely
    |
65  |                       .body(BodyExt::boxed(StreamBody::new(
    |  ___________________________--------------_^
    | |                           |
    | |                           required by a bound introduced by this call
66  | |                         ReaderStream::new(reader).map_ok(Frame::data),
67  | |                     )))
    | |_____________________^ `dyn tokio::io::AsyncRead + std::marker::Send` cannot be shared between threads safely
    |
    = help: the trait `Sync` is not implemented for `dyn tokio::io::AsyncRead + std::marker::Send`
    = note: required for `std::ptr::Unique<dyn tokio::io::AsyncRead + std::marker::Send>` to implement `Sync`

I think you just need to replace usages of BoxBody with UnsyncBoxBody, and replace boxed with boxed_unsync.

1 Like