Reqwest + tokio: calculating checksum for POST body

I have the following working code

async fn upload_file(client: reqwest::Client) {
    let reader = tokio::fs::File::open("test.txt").await.unwrap();
    let req = client
         .put("http://example.com")
         .body(reqwest::Body::from(reader)
         .build().unwrap();
    let resp = client.execute(req).await.unwrap();
)

Now I'd like to calculate a hash while streaming from the file system.

async fn upload_file(client: reqwest::Client) {
    let hasher = md5::new();
    let reader = tokio::fs::File::open("test.txt").await.unwrap();
    let hashing_reader = tokio_util::io::InspectReader::new(reader, |bytes| hasher.update(&bytes));
    let req = client
         .put("http://example.com")
         .body(reqwest::Body::from(hashing_reader)
         .build().unwrap();
    let resp = client.execute(req).await.unwrap();
    let md5sum = hasher.finalize();
)

This does not work (missing From<InspectReader<...>> for Body). Unpacking the From implementation (body.rs - source) using

pub async fn upload_file(
    reqwest_client: reqwest::Client,
    file_name: impl AsRef<Path>,
    target_url: impl IntoUrl,
) {
    let reader = tokio::fs::File::open(&file_name).await.unwrap();
    let mut hasher = Md5::new();
    let mut hashing_reader = InspectReader::new(reader, |bytes| hasher.update(bytes));
    let stream = ReaderStream::new(hashing_reader);
    let body = reqwest::Body::wrap_stream(stream);
    let req = reqwest_client.put(target_url).body(body).build().unwrap();
    let ret = reqwest_client.execute(req).await.unwrap();
    let md5sum = hasher.finalize();
}

leads to closure may outlive the current function, but it borrows hasher, which is owned by the current function and cannot move out of hasher because it is borrowed

This ought to be possible, but how?

I also tried using async_stream, but that had the same result.

Not sure I'll be able to help, but please the full output from cargo build at the command line so that we can see the complete error in context.

I got it to build like this:

but didn't test it.

Thank you for your answer. Unfortunately if you InspectReader::into_inner will just return the original reader, so the inspection does not happen.

A complete example is here

unfortunately the playground does not use reqwests stream feature flag, so the error messages are wrong. I append the complete messages below.

error[E0373]: closure may outlive the current function, but it borrows `hasher`, which is owned by the current function
  --> src/lib/s3.rs:15:53
   |
15 |     let hashing_reader = InspectReader::new(reader, |bytes| hasher.update(bytes));
   |                                                     ^^^^^^^ ------ `hasher` is borrowed here
   |                                                     |
   |                                                     may outlive borrowed value `hasher`
   |
note: function requires argument type to outlive `'static`
  --> src/lib/s3.rs:17:16
   |
17 |     let body = reqwest::Body::wrap_stream(stream);
   |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
help: to force the closure to take ownership of `hasher` (and any other referenced variables), use the `move` keyword
   |
15 |     let hashing_reader = InspectReader::new(reader, move |bytes| hasher.update(bytes));
   |                                                     ++++

error[E0505]: cannot move out of `hasher` because it is borrowed
  --> src/lib/s3.rs:21:18
   |
15 |     let hashing_reader = InspectReader::new(reader, |bytes| hasher.update(bytes));
   |                                                     ------- ------ borrow occurs due to use in closure
   |                                                     |
   |                                                     borrow of `hasher` occurs here
16 |     let stream = ReaderStream::new(hashing_reader);
17 |     let body = reqwest::Body::wrap_stream(stream);
   |                ---------------------------------- argument requires that `hasher` is borrowed for `'static`
...
21 |     let md5sum = hasher.finalize();
   |                  ^^^^^^ move out of `hasher` occurs here

Some errors have detailed explanations: E0373, E0505.
For more information about an error, try `rustc --explain E0373`.

I am pretty certain it's because of wrap_stream's signature:

pub fn wrap_stream<S>(stream: S) -> Body
where
    S: TryStream + Send + 'static,
    S::Error: Into<Box<dyn Error + Send + Sync>>,
    Bytes: From<S::Ok>,

Its S: 'static lifetime is pretty restrictive. That's Body::wrap_stream requires 'static · Issue #1182 · seanmonstar/reqwest · GitHub I think.

It looks as if hyper has the same limitation: SendRequest in hyper::client::conn::http1 - Rust

I fear it's just not possible. I would very appreciate if someone could correct me.

You could read the file in as a whole and only put it into the body after you generated your hash.

Unfortunately if you InspectReader::into_inner will just return the original reader, so the inspection does not happen.

Oops, yep, I missed that :man_facepalming:

Your second approach could work if you wrap the hasher in an Arc/Mutex, like this:

It's a bit dodgy, though. You have to use std::sync::Mutex, not tokio::sync::Mutex, because the closure passed to InspectReader is not async. This means you'll be blocking a tokio worker thread while the hashing is processing. [EDIT] Although it occurs to me that this is unavoidable - whatever you do, the hashing will happen on the tokio worker thread, which is probably not ideal, since tokio threads are not supposed to block for any significant time.

I assume that blocking is not such a big problem (in this case), because the chunk size reading from the file is not going to be gigabytes and I hope that md5, being md5, is fast enough. I do fear deadlocks, but because the iterator is not going to be parallel, I don't see how this should happen.

The bigger problem is that it does not work with the md-5 crate. Trying it out (only replacing hasher = MyHasher {}; with hasher = md5::Md5::new() I get the following error:

error[E0507]: cannot move out of dereference of `std::sync::MutexGuard<'_, CoreWrapper<Md5Core>>`
  --> src/lib/s3.rs:60:18
   |
60 |     let md5sum = hasher_rc.lock().unwrap().finalize();
   |                  ^^^^^^^^^^^^^^^^^^^^^^^^^ ---------- value moved due to this method call
   |                  |
   |                  move occurs because value has type `CoreWrapper<Md5Core>`, which does not implement the `Copy` trait
   |
note: `md5::Digest::finalize` takes ownership of the receiver `self`, which moves value
  --> /var/home/tob/.cargo/registry/src/index.crates.io-6f17d22bba15001f/digest-0.10.7/src/digest.rs:30:17
   |
30 |     fn finalize(self) -> Output<Self>;
   |                 ^^^^
help: you can `clone` the value and consume it, but this might not be your desired behavior
   |
60 |     let md5sum = <CoreWrapper<Md5Core> as Clone>::clone(&hasher_rc.lock().unwrap()).finalize();
   |                  ++++++++++++++++++++++++++++++++++++++++                         +

For more information about this error, try `rustc --explain E0507`.

Well, yes. But that's reading the file twice.

Hmm, yeah, looks like finalize() takes self. So the act of calling finalize() consumes the hasher instance.

I think you can work around that like this:

The extra curly brackets are important, to make sure hasher_rc is the only Arc left.

I tend to agree about the blocking; my guess is that it won't be a big deal.

1 Like

Thank you for your answer! That works. I modified the example to also compare the md5 with the ETAG.

Would you look at it and tell me whether something looks fishy. I might contribute the example to the reqwest crate, because it seems like a regular request.

Yep, code looks fine to me. I'd be glad to see it added to the reqwest 'examples' folder.