Buffer_unordered non-Send when used with references and closures?

I'm running into an issue trying to use futures::stream::BufferUnordered within a function that I need to be Send. It appears to lose the Send bound if I hand a reference to a closure. Using owned values with a closure, or passing a reference to a function retains the Send bound (a function will not work in my actual use-case as I need to close over other values). This is what I'm looking at:

use futures::{stream::iter, StreamExt};

pub fn main() {
    assert_send(owned_and_closure_works()); // OK
    assert_send(ref_and_fn_works()); // OK

    assert_send(ref_and_closure_errors()); // ERROR!
}

fn assert_send<T: Send>(_x: T) {}

async fn make_future(x: &()) -> &() {
    x
}

async fn owned_and_closure_works() {
    let values = vec![()];

    let mut s = iter(values.into_iter().map(|x| async move { x })).buffer_unordered(1);

    let _value = s.next().await;
}

async fn ref_and_fn_works() {
    let values = vec![()];

    let mut s = iter(values.iter().map(make_future)).buffer_unordered(1);

    let _value = s.next().await;
}

async fn ref_and_closure_errors() {
    let values = vec![()];

    let mut s = iter(values.iter().map(|x| async move { x })).buffer_unordered(1);

    let _value = s.next().await;
}

I get a lifetime error that I have not seen before

error: implementation of `FnOnce` is not general enough
 --> src/main.rs:7:5
  |
7 |     assert_send(ref_and_closure_errors());
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ implementation of `FnOnce` is not general enough
  |
  = note: closure with signature `fn(&'0 ()) -> {async block@src/main.rs:35:44: 35:54}` must implement `FnOnce<(&'1 (),)>`, for any two lifetimes `'0` and `'1`...
  = note: ...but it actually implements `FnOnce<(&(),)>`

Is there any way to resolve this error without rewriting my code to somehow use a function or owned values?

Looks like this issue or this larger root cause.

Here's a workaround, but it has an allocation and indirection cost (use Pin<Box<dyn Future>>).

// We need the funnel because Rust is awful at borrowing closure inference
//
// We need the type erasure because `async` block futures cannot be named
fn funnel<F>(f: F) -> F
where
    F: for<'a> Fn(&'a ()) -> Pin<Box<dyn Send + Future<Output = &'a ()> + 'a>>,
{
    f
}

// ...

    let mut s = iter(
        values
            .iter()
            .map(funnel(|x| Box::pin(async move { x })))
    ).buffer_unordered(1);
1 Like

This is probably similar to known issues such as

A possible workaround I could come up with is to box the stream, i.e.

-   let mut s = iter(values.iter().map(|x| async move { x })).buffer_unordered(1);
+   let mut s = iter(values.iter().map(|x| async move { x })).buffer_unordered(1).boxed();

You can try if that solves it for you or not.

1 Like

Thanks both, boxing does indeed resolve the issue. I think given that it's a know limitation, I'm going to see if I can refactor to use owned values instead of refs.

I’ve just come up with a better solution that avoids the boxing entirely; see here.

1 Like