"Implementation of `FnOnce` is not general enough" with async block

I have a problem getting the following sample code to work:

use futures::{stream, StreamExt, TryStreamExt};
use reqwest::Client;

async fn run() -> Vec<String> {
    let client = &Client::new();
    
    stream::iter(["asdf", "qwer"])
        .map(|s| async move {
            let resp = client.get(format!("https://httpbin.org/get?{s}")).send().await?;
            resp.text().await
        })
        .buffer_unordered(10)
        .try_collect().await.unwrap()
}

#[tokio::main]
async fn main() {
    tokio::spawn(async move {
        println!("{:#?}", run().await);
    });
}

Playground
which results in compiler errors:

error: implementation of `std::marker::Send` is not general enough
  --> src/main.rs:18:5
   |
18 | /     tokio::spawn(async move {
19 | |         println!("{:#?}", run().await);
20 | |     });
   | |______^ implementation of `std::marker::Send` is not general enough
   |
   = note: `std::marker::Send` would have to be implemented for the type `&'0 str`, for any lifetime `'0`...
   = note: ...but `std::marker::Send` is actually implemented for the type `&'1 str`, for some specific lifetime `'1`

error: implementation of `FnOnce` is not general enough
  --> src/main.rs:18:5
   |
18 | /     tokio::spawn(async move {
19 | |         println!("{:#?}", run().await);
20 | |     });
   | |______^ implementation of `FnOnce` is not general enough
   |
   = note: closure with signature `fn(&'0 str) -> impl futures::Future<Output = Result<String, reqwest::Error>>` must implement `FnOnce<(&'1 str,)>`, for any two lifetimes `'0` and `'1`...
   = note: ...but it actually implements `FnOnce<(&str,)>`

I understand this is a known compiler issue - e.g. this post has a lot of information about it. Nevertheless, none of the suggested workarounds (at least those I've found and tried) seem to help here (presumably because of the added compliexity due to async move block inside the closure?). I've tried to implement the workaround with a wrapper function to provide explicit type annotation, but I can't quite figure out the signature/bounds on that function (given that it returns a future).

The code would work if, for instance, I remove .buffer_unordered() call and replace .map() with .then(), but then I'd lose concurrency.

It should be possible to make it work with buffer_unordered, shouldn't it?

Thanks.

1 Like

What a weird error. Anyway, this seems to work:

async fn run() -> Vec<String> {
    let client = &Client::new();
    
    stream::iter(["asdf", "qwer"])
        .map(|s| async move {
            let resp = client.get(format!("https://httpbin.org/get?{s}")).send().await?;
            resp.text().await
        })
        .boxed() // <--- I added this
        .buffer_unordered(10)
        .try_collect().await.unwrap()
}
3 Likes

It should, but as shown by your very detailed post (and the posts it links to), there is a compiler bug that causes problems.

Within a given context of inferred lifetimes, such as when having to coerce a given type (such as a future/stream) to a dyn Send as @alice's .boxed() solution does, the compiler is then correctly able to observe that all the requirements are met, and then proceeds to type-erase it into an always-provably Send type such as dyn Send ....

But,

  1. if that type-erasing / Send-enforcing step doesn't happen within the body of the function,

  2. then, the compiler heuristic that tries to guess whether the whole generator / async future is Send ends up having to prove that it must hold for any lifetime:

    • when we used to have (in the context / scope of the function body), an inferred and thus fixed lifetime with which to reason about Send-ness, the compiler now ends up with, instead, some higher-order / universally quantified lifetime over which the Send-ness requirement must hold.
  3. this is what causes Rust to, in certain contrived situations involving a bunch of associated types such as .futures_unordered future, (and -I suspect- some restricted impl of Send somewhere, e.g., in reqwest or futures), to be unable to see that the Send-ness requirement holds for any lifetime, and thus, that the resulting generator / async future does.

Hence the error.


The workarounds

  • Either you tweak your types / etc. (e.g., by massaging the callback's signature so that it becomes properly higher-order?) into now being able to meet this more demanding higher-order constraint

    • (but I do say in that other post that this is unsatisfactory, since the higher-order-ness requirement ought not be needed to begin with!)
  • or you "type-erase" into a provably Send type. For this, dyn Send but also impl Send are fine. The annoying part will be to "unerase" everything else so that the type remains usable (e.g., the thing is a Stream, with a concrete yield type, etc.).

This, thus, yields the following Box-less solution:

fn assert_send_stream<R> (
    it: impl Send + Stream<Item = R>,
) -> impl Send + Stream<Item = R>
{
    it
}

With it, we can write:

async fn run() -> Vec<String> {
    let client = &Client::new();
    let fut = assert_send_stream(
        stream::iter(["asdf", "qwer"])
            .map(|s| async move {
                let resp = client.get(format!("https://httpbin.org/get?{s}")).send().await?;
                resp.text().await
            })
            .buffer_unordered(10)
    );
    fut .try_collect()
        .await
        .unwrap()
}

which does compile fine.

3 Likes

Thank you @Yandros, @alice, both of these workarounds do the trick! I'm gonna mark the box-less one as a Solution because it's presumably more efficient.