I have this code to PUT and GET file on S3 using legacy hyper client. Now I want to use reqwest but I don't know what to use, can you help me?
use futures::{StreamExt, TryStreamExt};
use http_body_util::{combinators::UnsyncBoxBody, BodyExt, BodyStream, StreamBody};
use hyper::{body::Frame, header, Method};
use hyper_tls::HttpsConnector;
use hyper_util::{
client::legacy::{self, connect::HttpConnector},
rt::TokioExecutor,
};
use std::{
io,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::io::AsyncRead;
use tokio_util::{
bytes::Bytes,
io::{ReaderStream, StreamReader},
};
enum HyperClientCustomBody {
Stream(UnsyncBoxBody<Bytes, std::io::Error>),
Empty,
}
impl hyper::body::Body for HyperClientCustomBody {
type Data = hyper::body::Bytes;
type Error = std::io::Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match &mut *self.get_mut() {
Self::Stream(stream) => Pin::new(stream).poll_frame(cx),
Self::Empty => Poll::Ready(None),
}
}
}
async fn put_file(
&self,
filename: &str,
size: i64,
reader: Pin<Box<(dyn AsyncRead + Send)>>,
) -> Result<()> {
let hyper_client = legacy::Client::builder(TokioExecutor::new()).build::<_, HyperClientCustomBody>(HttpsConnector::new());
hyper_client
.request(
hyper::Request::builder()
.method(Method::PUT)
.uri(get_uri())
.header(header::CONTENT_LENGTH, size)
.body(HyperClientCustomBody::Stream(BodyExt::boxed_unsync(
StreamBody::new(ReaderStream::new(reader).map_ok(Frame::data)),
)))
.unwrap(),
)
.await?;
Ok(())
}
async fn get_file(&self, filename: &str) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
let hyper_client = legacy::Client::builder(TokioExecutor::new()).build::<_, HyperClientCustomBody>(HttpsConnector::new());
let resp = hyper_client
.request(
hyper::Request::builder()
.method(Method::GET)
.uri(get_uri())
.body(HyperClientCustomBody::Empty)
.unwrap(),
)
.await?;
let body = BodyStream::new(resp).filter_map(|res| async {
Some(match res {
Ok(frame) => Ok(frame.into_data().ok()?),
Err(e) => Err(io::Error::other(e)),
})
});
let res = Box::pin(StreamReader::new(body));
Ok(res)
}
I tried using reqwest like this:
async fn put_file(
&self,
filename: &str,
size: i64,
reader: Pin<Box<(dyn AsyncRead + Send)>>,
) -> Result<()> {
let http_client = reqwest::Client::new();
http_client
.put(get_uri())
.header("Content-Length", size)
// -----> I don't know what to use in body here
// .body(StreamBody::new(ReaderStream::new(
// reqwest::Body::wrap_stream(reader),
// )))
// .body(reqwest::Body::wrap_stream(reader))
.send()
.await?;
Ok(())
}
async fn get_file(&self, filename: &str) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
let hyper_client = reqwest::Client::new();
let resp = http_client.get(get_uri()).send().await?;
// -----> How to handle body here?
let body = BodyStream::new(resp).filter_map(|res| async {
Some(match res {
Ok(frame) => Ok(frame.into_data().ok()?),
Err(e) => Err(io::Error::other(e)),
})
});
let res = Box::pin(StreamReader::new(body));
Ok(res)
}
UPDATE:
I tried this code:
async fn put_file(
&self,
filename: &str,
size: i64,
reader: Pin<Box<(dyn AsyncRead + Send)>>,
) -> Result<()> {
let http_client = reqwest::Client::new();
http_client
.put(get_uri())
.header("Content-Length", size)
.body(reqwest::Body::wrap_stream(ReaderStream::new(reader)))
.send()
.await?;
Ok(())
}
async fn get_file(&self, filename: &str) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
let hyper_client = reqwest::Client::new();
let resp = http_client.get(get_uri()).send().await?;
let stream = resp
.bytes_stream()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
let res = Box::pin(StreamReader::new(stream));
Ok(res)
}
and it works except for this error:
error[E0277]: `dyn tokio::io::AsyncRead + std::marker::Send` cannot be shared between threads safely
|
53 | .body(reqwest::Body::wrap_stream(ReaderStream::new(reader)))
| -------------------------- ^^^^^^^^^^^^^^^^^^^^^^^^^ `dyn tokio::io::AsyncRead + std::marker::Send` cannot be shared between threads safely
| |
| required by a bound introduced by this call
|
= help: the trait `std::marker::Sync` is not implemented for `dyn tokio::io::AsyncRead + std::marker::Send`, which is required by `tokio_util::io::ReaderStream<std::pin::Pin<std::boxed::Box<dyn tokio::io::AsyncRead + std::marker::Send>>>: std::marker::Sync`
= note: required for `std::ptr::Unique<dyn tokio::io::AsyncRead + std::marker::Send>` to implement `std::marker::Sync`
note: required because it appears within the type `std::boxed::Box<dyn tokio::io::AsyncRead + std::marker::Send>`
--> C:\Users\Fred\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\alloc\src\boxed.rs:197:12
|
197 | pub struct Box<
| ^^^
note: required because it appears within the type `std::pin::Pin<std::boxed::Box<dyn tokio::io::AsyncRead + std::marker::Send>>`
--> C:\Users\Fred\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:1090:12
|
1090 | pub struct Pin<Ptr> {
| ^^^
note: required because it appears within the type `std::option::Option<std::pin::Pin<std::boxed::Box<dyn tokio::io::AsyncRead + std::marker::Send>>>`
--> C:\Users\Fred\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\option.rs:572:10
|
572 | pub enum Option<T> {
| ^^^^^^
note: required because it appears within the type `tokio_util::io::ReaderStream<std::pin::Pin<std::boxed::Box<dyn tokio::io::AsyncRead + std::marker::Send>>>`
--> C:\Users\Fred\.cargo\registry\src\index.crates.io-6f17d22bba15001f\tokio-util-0.7.11\src\io\reader_stream.rs:45:16
|
45 | pub struct ReaderStream<R> {
| ^^^^^^^^^^^^
note: required by a bound in `reqwest::Body::wrap_stream`
--> C:\Users\Fred\.cargo\registry\src\index.crates.io-6f17d22bba15001f\reqwest-0.12.4\src\async_impl\body.rs:91:53
|
89 | pub fn wrap_stream<S>(stream: S) -> Body
| ----------- required by a bound in this associated function
90 | where
91 | S: futures_core::stream::TryStream + Send + Sync + 'static,
| ^^^^ required by this bound in `Body::wrap_stream`
How can I fix?
Is this code OK?