Warp future is not Send as it awaits other future

Hey,

Im currently writing a web application using warp. So far so good but unfortunately I'm having an issue.
I would like to handle an wait within the handler. However it seems not to work.

async fn get_handler(ipfs: SharedIpfsClient) -> Result<impl warp::Reply, Infallible> {

    let ipfs = ipfs.lock().unwrap();
    let data = Cursor::new("Hallo World");

// Type error because of the await 
// [rustc] [H] future is not `Send` as it awaits another future which is not `Send`
// [rustc] [H] future is not `Send` as this value is used across an await
ipfs.client.add(data).await;

    Ok(warp::reply::reply())
}

async fn post_handler(_ipfs: SharedIpfsClient) -> Result<impl warp::Reply, Infallible> {
    Ok(warp::reply::reply())
}

pub fn api(
    ipfs: SharedIpfsClient,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
    warp::path("ipfs")
        .and(
            warp::get()
                .and(with_ipfs_client(ipfs.clone()))
                .and_then(get_handler),
        )
        .or(warp::post()
            .and(with_ipfs_client(ipfs.clone()))
            .and_then(post_handler))
}

Any idea would be great.

Im already and_then which should make it work as it supports async handlers - unfortunately no look :frowning:

Looking forward to your suggestions.

Thanks for the time!!!
kind regards

Please post the entire error message as printed by cargo check. The two commented lines in your snippet look like they came from an IDE popup, and are missing important context that will help us diagnose your problem. Alternatively, you could provide a link to a playground or cloneable Git repo that reproduces the error.

Based on just the information you posted, my educated guess is that the lock object returned by ipfs.lock().unwrap() is not Send, so if its lifetime crosses an await then the Future represented by the enclosing async fn won't be Send either. If SharedIpfsClient is a type you control, you could address this by replacing whatever mutex it's using currently with a tokio::sync::Mutex, whose lock object is Send. However, this isn't always the best solution—see the caveats on that docs page—and it's hard to say what's appropriate for your use-case without more context.

Thanks for the quick response. Attached please find enclosed the error log from cli. I tried to wrap it with the Tokio:sync:Mutex but also didnt work.

after reverting back the struct looks as follow:

use std::sync::{Arc, Mutex};
use ipfs_api_backend_hyper::IpfsClient;

pub struct IpfsConnection {
    pub client: IpfsClient,
}

pub type SharedIpfsClient = Arc<Mutex<IpfsConnection>>;

Log:


error: future cannot be sent between threads safely
   --> src/api/rest/ipfs/mod.rs:30:18
    |
30  |                 .and_then(get_handler),
    |                  ^^^^^^^^ future returned by `get_handler` is not `Send`
    |
    = help: within `impl futures_util::Future<Output = Result<Opaque(DefId(0:124 ~ hihloo_onpremise_io[1e1d]::api::rest::ipfs::get_handler::{opaque#0}::{opaque#0}), []), Infallible>>`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, IpfsConnection>`
note: future is not `Send` as this value is used across an await
   --> src/api/rest/ipfs/mod.rs:14:18
    |
12  |     let x = ipfs.lock().unwrap();
    |         - has type `std::sync::MutexGuard<'_, IpfsConnection>` which is not `Send`
13  |     let data = std::io::Cursor::new("a");
14  |     let result = x.client.add(data).await;
    |                  ^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `x` maybe used later
...
17  | }
    | - `x` is later dropped here
note: required by a bound in `warp::Filter::and_then`
   --> /Users/schr3da/.cargo/registry/src/github.com-1ecc6299db9ec823/warp-0.3.2/src/filter/mod.rs:259:32
    |
259 |         F::Output: TryFuture + Send,
    |                                ^^^^ required by this bound in `warp::Filter::and_then`

error: future cannot be sent between threads safely
   --> src/api/rest/ipfs/mod.rs:30:18
    |
30  |                 .and_then(get_handler),
    |                  ^^^^^^^^ future returned by `get_handler` is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn futures_util::Future<Output = Result<AddResponse, ipfs_api_backend_hyper::Error>>`
note: future is not `Send` as it awaits another future which is not `Send`
   --> src/api/rest/ipfs/mod.rs:14:18
    |
14  |     let result = x.client.add(data).await;
    |                  ^^^^^^^^^^^^^^^^^^ await occurs here on type `Pin<Box<dyn futures_util::Future<Output = Result<AddResponse, ipfs_api_backend_hyper::Error>>>>`, which is not `Send`
note: required by a bound in `warp::Filter::and_then`
   --> /Users/schr3da/.cargo/registry/src/github.com-1ecc6299db9ec823/warp-0.3.2/src/filter/mod.rs:259:32
    |
259 |         F::Output: TryFuture + Send,
    |                                ^^^^ required by this bound in `warp::Filter::and_then`

error: future cannot be sent between threads safely
  --> src/api/rest/ipfs/mod.rs:28:13
   |
28 | /             warp::get()
29 | |                 .and(with_ipfs_client(ipfs.clone()))
30 | |                 .and_then(get_handler),
   | |______________________________________^ future returned by `get_handler` is not `Send`
   |
   = help: within `impl futures_util::Future<Output = Result<Opaque(DefId(0:124 ~ hihloo_onpremise_io[1e1d]::api::rest::ipfs::get_handler::{opaque#0}::{opaque#0}), []), Infallible>>`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, IpfsConnection>`
note: future is not `Send` as this value is used across an await
  --> src/api/rest/ipfs/mod.rs:14:18
   |
12 |     let x = ipfs.lock().unwrap();
   |         - has type `std::sync::MutexGuard<'_, IpfsConnection>` which is not `Send`
13 |     let data = std::io::Cursor::new("a");
14 |     let result = x.client.add(data).await;
   |                  ^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `x` maybe used later
...
17 | }
   | - `x` is later dropped here

error: future cannot be sent between threads safely
  --> src/api/rest/ipfs/mod.rs:28:13
   |
28 | /             warp::get()
29 | |                 .and(with_ipfs_client(ipfs.clone()))
30 | |                 .and_then(get_handler),
   | |______________________________________^ future returned by `get_handler` is not `Send`
   |
   = help: the trait `std::marker::Send` is not implemented for `dyn futures_util::Future<Output = Result<AddResponse, ipfs_api_backend_hyper::Error>>`
note: future is not `Send` as it awaits another future which is not `Send`
  --> src/api/rest/ipfs/mod.rs:14:18
   |
14 |     let result = x.client.add(data).await;
   |                  ^^^^^^^^^^^^^^^^^^ await occurs here on type `Pin<Box<dyn futures_util::Future<Output = Result<AddResponse, ipfs_api_backend_hyper::Error>>>>`, which is not `Send`

Thanks for the effort!
regards

Thanks for the additional information! So keeping the std::sync::MutexGuard over the await is indeed the problem. I can solve the problem (in a minimized example) by replacing std::sync::Mutex with tokio::sync::Mutex: playground. Notes:

  1. I don't know exactly why using tokio::sync::Mutex didn't work when you tried it. My guess is that you misinterpreted my suggestion and tried wrapping it around SharedIpfsClient instead of replacing std::sync::Mutex.
  2. I assumed that IpfsClient::add takes &mut self. If it actually takes &self, then you don't need any kind of Mutex because you can get &IpfsClient from Arc<IpfsConnection>.

Thanks for your quick reply - I tried it out but it seems not to work in my project.
Please find attached the git repo https://github.com/Schr3da/test-repository

The code has been prepared so far that it will not compile caused by the mentioned error.

Very happy if you have few minutes to look into.

Kind regards

Thanks for sharing the complete code. The error you're running into now is a slightly different one, although the symptom ("future returned by get_handler is not Send") is the same. The problem is that the signature of <IpfsClient as IpfsApi>::add has the return type

Pin<Box<dyn Future<Output = Result<AddResponse, Self::Error>> + 'a>>

(equivalently BoxFuture<'a, Result<AddResponse, Self::Error>>), for some lifetime 'a. Now dyn Future<Output = ...> + 'a is not Send, because you can coerce to it from a !Send type. A type-erased future that is Send looks like dyn Future<Output = ...> + Send + 'a. There is no way to fix this from your own code, you would need ipfs_api_backend_hyper::IpfsApi::add to change its return type to include that + Send. Since ipfs_api_backend_hyper uses the async-trait crait, they might be able to do this simply by changing

- #[async_trait(?Send)]
+ #[async_trait]
pub trait IpfsApi: Backend {

But they might have a good reason to be using async_trait(?Send)—I can't tell without digging into the code—and in any case this would be a breaking change. Still, I'd recommend opening an issue/PR with them to ask about this!

P.S. Definitely don't do this:

unsafe impl Send is only for cases where you're building a data structure that maintains Rust's thread-safety invariants in a way the compiler can't check automatically (often because you're using raw pointers). You should never use it to "get around" someone else's type being !Send.

2 Likes

Thank you for your help and time - the unsafe was just a try :smiley:

Thanks for the time!!!!!

kind regards

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.