Async equivalent of rayon::iter::ParallelIterator

I have the following function:

pub fn foo<T, C>(constructor: C) -> Vec<T>
where
    T: Send,
    C: Fn(usize) -> T + Sync,
{
    use rayon::iter::{IntoParallelIterator, ParallelIterator};
    (0..COUNT).into_par_iter().map(|arg| constructor(arg)).collect()
}

I would like to write an async equivalent of that function. How should I do that? Is the following idea suitable/reasonable/idiomatic?

pub async fn foo_async<T, C, F>(mut constructor: C) -> Vec<T>
where
    C: FnMut(usize) -> F,
    F: std::future::Future<Output = T>,
{
    use futures::stream::{FuturesOrdered, StreamExt};
    let mut futures = FuturesOrdered::<F>::new();
    for arg in 0..COUNT {
        futures.push_back(constructor(arg));
    }
    futures.collect().await
}

(edit: removed Send bounds from foo_async)

(Playground)

Anything I should be aware of?

I'd probably make use the FromIterator impl of FuturesOrdered instead of the loop with push_back.

Also you can probably write map(|arg| constructor(arg)) as map(constructor) (though I haven't tested whether it compiles in this case; sometimes these are tricky if the traits don't match 100%).

2 Likes

Like this?

pub async fn foo_async<T, C, F>(constructor: C) -> Vec<T>
where
    C: FnMut(usize) -> F,
    F: std::future::Future<Output = T>,
{
    use futures::stream::{FuturesOrdered, StreamExt};
    FuturesOrdered::from_iter(
        (0..COUNT).into_iter().map(constructor)
    ).collect().await
}

(Playground)

For foo_async it works, but foo would require an extra Send bound then.

Side quest: Is there any way to extend an existing Vec with a FuturesOrdered stream (see also StreamExt.

pub async fn bar<T, C, F>(vec: &mut Vec<T>, constructor: C)
where
    C: FnMut(usize) -> F,
    F: std::future::Future<Output = T>,
{
    use futures::stream::{FuturesOrdered, StreamExt};
    let futs = FuturesOrdered::from_iter(
        (0..COUNT).into_iter().map(constructor)
    );
    vec.extend(futs).await; // doesn't work obviously
}

(Playground)

I see, makes sense.

Probably calling push in a for_each could work. You could also use an ordinary for loop and call next().await, I'm not sure which one is nicer and if the difference matters in any other way (note that the thing is Unpin so calls to next work without issues).

1 Like

You shouldn't await the extend(); it's a non-async method that doesn't become async just because you are in an async fn.

You probably want

vec.extend(futs.collect::<Vec<_>>().await);

or something like that.

:thinking: …this seem to work:

const COUNT: usize = 100;

pub async fn bar<T, C, F>(vec: &mut Vec<T>, constructor: C)
where
    C: FnMut(usize) -> F,
    F: std::future::Future<Output = T>,
{
    use futures::stream::{FuturesOrdered, StreamExt};
    vec.reserve(COUNT);
    FuturesOrdered::from_iter(
        (0..COUNT).into_iter().map(constructor)
    ).for_each(|item| {
        vec.push(item);
        async { () }
    }).await;
}

(Playground)

But the overall syntax is kinda awkwark with the async { () }, which is needed to turn the syncronous vec.push(item) call into an async closure. I guess I would be looking for some method that takes a sync closure instead, or even better: which accepts an argument that implements Extend. But I didn't find either.

Yeah, well, I just wrote down what I meant to achieve. I didn't really expect it to work that way.

This creates an extra Vec which will be needlessly allocated and deallocated. But the syntax/readability seems to be a bit better than the approach using for_each and async { () } above. So not sure what's best.


Using Nightly, I could also do:

#![feature(extend_one)]

const COUNT: usize = 100;

pub async fn bar<T, C, F>(out: &mut impl Extend<T>, constructor: C)
where
    C: FnMut(usize) -> F,
    F: std::future::Future<Output = T>,
{
    use futures::stream::{FuturesOrdered, StreamExt};
    out.extend_reserve(COUNT);
    FuturesOrdered::from_iter(
        (0..COUNT).into_iter().map(constructor)
    ).for_each(|item| {
        out.extend_one(item);
        async { () }
    }).await;
}

(Nightly Playground)

That extend_reserve and extend_one call is basically what I would like to be part of StreamExt instead. Would that be a reasonable feature request? Or are there existing traits which allow to do this without writing it down manually?

P.S.: In practice I will likely just create an intemediate Vec. The allocation/appending won't hurt, I think. But I still see that StreamExt here is a bit limited.


Actually, I could use &constructor instead of constructor as borrowing an Fn + Sync seems to give me an Fn + Send + Sync. (Couldn't the compiler do it implicitly? It's universally working to turn any Fn + Sync into an Fn + Sync + Send, right?) Edit: I created a new thread on that issue here.

 const COUNT: usize = 100;
 
 pub fn foo<T, C>(constructor: C) -> Vec<T>
 where
     T: Send,
     C: Fn(usize) -> T + Sync,
 {
     use rayon::iter::{IntoParallelIterator, ParallelIterator};
-    (0..COUNT).into_par_iter().map(constructor).collect()
+    (0..COUNT).into_par_iter().map(&constructor).collect()
 }

(Playground)

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.