From hyper legacy client to reqwest 0.12, how to handle stream upload (PUT) and stream download (GET) from S3?

I have this code to PUT and GET file on S3 using legacy hyper client. Now I want to use reqwest but I don't know what to use, can you help me?

use futures::{StreamExt, TryStreamExt};
use http_body_util::{combinators::UnsyncBoxBody, BodyExt, BodyStream, StreamBody};
use hyper::{body::Frame, header, Method};
use hyper_tls::HttpsConnector;
use hyper_util::{
    client::legacy::{self, connect::HttpConnector},
    rt::TokioExecutor,
};
use std::{
    io,
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};
use tokio::io::AsyncRead;
use tokio_util::{
    bytes::Bytes,
    io::{ReaderStream, StreamReader},
};

enum HyperClientCustomBody {
    Stream(UnsyncBoxBody<Bytes, std::io::Error>),
    Empty,
}

impl hyper::body::Body for HyperClientCustomBody {
    type Data = hyper::body::Bytes;
    type Error = std::io::Error;

    fn poll_frame(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        match &mut *self.get_mut() {
            Self::Stream(stream) => Pin::new(stream).poll_frame(cx),
            Self::Empty => Poll::Ready(None),
        }
    }
}

async fn put_file(
    &self,
    filename: &str,
    size: i64,
    reader: Pin<Box<(dyn AsyncRead + Send)>>,
) -> Result<()> {
    let hyper_client = legacy::Client::builder(TokioExecutor::new()).build::<_, HyperClientCustomBody>(HttpsConnector::new());

    hyper_client
        .request(
            hyper::Request::builder()
                .method(Method::PUT)
                .uri(get_uri())
                .header(header::CONTENT_LENGTH, size)
                .body(HyperClientCustomBody::Stream(BodyExt::boxed_unsync(
                    StreamBody::new(ReaderStream::new(reader).map_ok(Frame::data)),
                )))
                .unwrap(),
        )
        .await?;

    Ok(())
}

async fn get_file(&self, filename: &str) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
    let hyper_client = legacy::Client::builder(TokioExecutor::new()).build::<_, HyperClientCustomBody>(HttpsConnector::new());

    let resp = hyper_client
        .request(
            hyper::Request::builder()
                .method(Method::GET)
                .uri(get_uri())
                .body(HyperClientCustomBody::Empty)
                .unwrap(),
        )
        .await?;

    let body = BodyStream::new(resp).filter_map(|res| async {
        Some(match res {
            Ok(frame) => Ok(frame.into_data().ok()?),
            Err(e) => Err(io::Error::other(e)),
        })
    });

    let res = Box::pin(StreamReader::new(body));

    Ok(res)
}

I tried using reqwest like this:

async fn put_file(
    &self,
    filename: &str,
    size: i64,
    reader: Pin<Box<(dyn AsyncRead + Send)>>,
) -> Result<()> {
    let http_client = reqwest::Client::new();

    http_client
        .put(get_uri())
            .header("Content-Length", size)
            // -----> I don't know what to use in body here
            // .body(StreamBody::new(ReaderStream::new(
            //     reqwest::Body::wrap_stream(reader),
            // )))
            // .body(reqwest::Body::wrap_stream(reader))
            .send()
            .await?;

    Ok(())
}

async fn get_file(&self, filename: &str) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
    let hyper_client = reqwest::Client::new();

    let resp = http_client.get(get_uri()).send().await?;

    // -----> How to handle body here?

    let body = BodyStream::new(resp).filter_map(|res| async {
        Some(match res {
            Ok(frame) => Ok(frame.into_data().ok()?),
            Err(e) => Err(io::Error::other(e)),
        })
    });

    let res = Box::pin(StreamReader::new(body));

    Ok(res)
}

UPDATE:

I tried this code:

async fn put_file(
    &self,
    filename: &str,
    size: i64,
    reader: Pin<Box<(dyn AsyncRead + Send)>>,
) -> Result<()> {
    let http_client = reqwest::Client::new();

    http_client
        .put(get_uri())
            .header("Content-Length", size)
            .body(reqwest::Body::wrap_stream(ReaderStream::new(reader)))
            .send()
            .await?;

    Ok(())
}

async fn get_file(&self, filename: &str) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
    let hyper_client = reqwest::Client::new();

    let resp = http_client.get(get_uri()).send().await?;

    let stream = resp
            .bytes_stream()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e));

        let res = Box::pin(StreamReader::new(stream));

    Ok(res)
}

and it works except for this error:

error[E0277]: `dyn tokio::io::AsyncRead + std::marker::Send` cannot be shared between threads safely
     |
53   |             .body(reqwest::Body::wrap_stream(ReaderStream::new(reader)))
     |                   -------------------------- ^^^^^^^^^^^^^^^^^^^^^^^^^ `dyn tokio::io::AsyncRead + std::marker::Send` cannot be shared between threads safely
     |                   |
     |                   required by a bound introduced by this call
     |
     = help: the trait `std::marker::Sync` is not implemented for `dyn tokio::io::AsyncRead + std::marker::Send`, which is required by `tokio_util::io::ReaderStream<std::pin::Pin<std::boxed::Box<dyn tokio::io::AsyncRead + std::marker::Send>>>: std::marker::Sync`
     = note: required for `std::ptr::Unique<dyn tokio::io::AsyncRead + std::marker::Send>` to implement `std::marker::Sync`
note: required because it appears within the type `std::boxed::Box<dyn tokio::io::AsyncRead + std::marker::Send>`
    --> C:\Users\Fred\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\alloc\src\boxed.rs:197:12
     |
197  | pub struct Box<
     |            ^^^
note: required because it appears within the type `std::pin::Pin<std::boxed::Box<dyn tokio::io::AsyncRead + std::marker::Send>>`
    --> C:\Users\Fred\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:1090:12
     |
1090 | pub struct Pin<Ptr> {
     |            ^^^
note: required because it appears within the type `std::option::Option<std::pin::Pin<std::boxed::Box<dyn tokio::io::AsyncRead + std::marker::Send>>>`
    --> C:\Users\Fred\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\option.rs:572:10
     |
572  | pub enum Option<T> {
     |          ^^^^^^
note: required because it appears within the type `tokio_util::io::ReaderStream<std::pin::Pin<std::boxed::Box<dyn tokio::io::AsyncRead + std::marker::Send>>>`
    --> C:\Users\Fred\.cargo\registry\src\index.crates.io-6f17d22bba15001f\tokio-util-0.7.11\src\io\reader_stream.rs:45:16
     |
45   |     pub struct ReaderStream<R> {
     |                ^^^^^^^^^^^^
note: required by a bound in `reqwest::Body::wrap_stream`
    --> C:\Users\Fred\.cargo\registry\src\index.crates.io-6f17d22bba15001f\reqwest-0.12.4\src\async_impl\body.rs:91:53
     |
89   |     pub fn wrap_stream<S>(stream: S) -> Body
     |            ----------- required by a bound in this associated function
90   |     where
91   |         S: futures_core::stream::TryStream + Send + Sync + 'static,
     |                                                     ^^^^ required by this bound in `Body::wrap_stream`

How can I fix?

Is this code OK?

Ah, this seems to be an accidental limitation in reqwest. Working around it is a bit annoying. Using SyncWrapper (which is the third-party of the unstable std feature Exclusive), you could write something like:

struct SyncStream<S>(SyncWrapper<S>);
impl<S: Unpin + Stream> Stream for SyncStream<S> {
    type Item = S::Item;
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.get_mut().0).get_pin_mut().poll_next(cx)
    }
}

and then use

let stream = SyncStream(SyncWrapper::new(ReaderStream::new(reader)));

with a body of reqwest::Body::wrap_stream(stream).


A quicker fix would be to add Send + Sync bounds in your dyn trait object, and this will likely work. But it’s also “evil” in that it’s not the ideal solution.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.