I'm running into an issue trying to use futures::stream::BufferUnordered within a function that I need to be Send. It appears to lose the Send bound if I hand a reference to a closure. Using owned values with a closure, or passing a reference to a function retains the Send bound (a function will not work in my actual use-case as I need to close over other values). This is what I'm looking at:
use futures::{stream::iter, StreamExt};
pub fn main() {
assert_send(owned_and_closure_works()); // OK
assert_send(ref_and_fn_works()); // OK
assert_send(ref_and_closure_errors()); // ERROR!
}
fn assert_send<T: Send>(_x: T) {}
async fn make_future(x: &()) -> &() {
x
}
async fn owned_and_closure_works() {
let values = vec![()];
let mut s = iter(values.into_iter().map(|x| async move { x })).buffer_unordered(1);
let _value = s.next().await;
}
async fn ref_and_fn_works() {
let values = vec![()];
let mut s = iter(values.iter().map(make_future)).buffer_unordered(1);
let _value = s.next().await;
}
async fn ref_and_closure_errors() {
let values = vec![()];
let mut s = iter(values.iter().map(|x| async move { x })).buffer_unordered(1);
let _value = s.next().await;
}
I get a lifetime error that I have not seen before
error: implementation of `FnOnce` is not general enough
--> src/main.rs:7:5
|
7 | assert_send(ref_and_closure_errors());
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ implementation of `FnOnce` is not general enough
|
= note: closure with signature `fn(&'0 ()) -> {async block@src/main.rs:35:44: 35:54}` must implement `FnOnce<(&'1 (),)>`, for any two lifetimes `'0` and `'1`...
= note: ...but it actually implements `FnOnce<(&(),)>`
Is there any way to resolve this error without rewriting my code to somehow use a function or owned values?
Here's a workaround, but it has an allocation and indirection cost (use Pin<Box<dyn Future>>).
// We need the funnel because Rust is awful at borrowing closure inference
//
// We need the type erasure because `async` block futures cannot be named
fn funnel<F>(f: F) -> F
where
F: for<'a> Fn(&'a ()) -> Pin<Box<dyn Send + Future<Output = &'a ()> + 'a>>,
{
f
}
// ...
let mut s = iter(
values
.iter()
.map(funnel(|x| Box::pin(async move { x })))
).buffer_unordered(1);
A possible workaround I could come up with is to box the stream, i.e.
- let mut s = iter(values.iter().map(|x| async move { x })).buffer_unordered(1);
+ let mut s = iter(values.iter().map(|x| async move { x })).buffer_unordered(1).boxed();
Thanks both, boxing does indeed resolve the issue. I think given that it's a know limitation, I'm going to see if I can refactor to use owned values instead of refs.