Idiomatic way of parallel execution of unordered futures?

Hello you lovely people,

Recently I was doing a deep dive of parallel execution of various tasks that take different amounts of time. After banging my head against the wall for an hour -- fruitlessly -- I figured I'll finally RTFM and inform myself and try and code it properly.

After a lot of experimentation I found the shortest way here: Rust Playground

Basic setup: spawn 5 workers that sleep but make sure the oldest is actually the fastest so we have a deterministic way of demonstrating that indeed the fastest one finishes first (and not just run them serially).

Shortened version:

async fn sleep_worker(i: u64) -> u64 {
    let time = Instant::now();
    sleep(Duration::from_millis(100 / i)).await;
    trace!("Worker {} elapsed: {:?}", i, time.elapsed());
    i
}

#[tokio::main]
async fn main() {
    let tasks: FuturesUnordered<_> = (1..=5).map(|x| tokio::spawn(sleep_worker(x))).collect();
    futures::future::join_all(tasks).await;
}

All good, right?

Well, no. First I am not sure if that's actually the recommended / idiomatic way of doing it, and secondly there are other options that also work. Listing them:

    let tasks: FuturesUnordered<_> = (1..=5).map(|x| tokio::spawn(sleep_worker(x))).collect();
-    futures::future::join_all(tasks).await;
+    let _results: Vec<_> = tasks.collect().await;

I suppose that one is easy: if you are interested in the results of the tasks, use that method. Or am I missing something?

Using loop and select!:

    use futures::stream::StreamExt;
    use futures::select;
    let mut completed = 0;
    loop {
        select! {
            _num = tasks.select_next_some() => {
                completed += 1;
            },
            complete => break,
        }
    }

Works but feels like this method is better suited for more diversified workloads where you need to wait on very different workers and just want to react to the first one to complete while letting others continue for the next loop. But do correct me if I you feel I am not getting it.


Question is: are there other and better approaches?

This is an exercise for a real project and I would love to make sure I am doing things as the async creators intended.

Any hints are welcome.
Thank you if you got this far.

1 Like

There's a quite serious problem with all of these approaches: You are ignoring errors. In general, I've always considered join_all bad practice because it wraps Result types in a vector, which hides the warning about ignoring errors. (It also had serious performance problems in the past.)

Here's what I suggest doing:

async fn exercise_out_of_order_execution() {
    let mut handles = Vec::new();
    for i in 1..=5 {
        handles.push(tokio::spawn(sleep_worker(i)));
    }
    
    let mut output = Vec::new();
    for handle in handles {
        output.push(handle.await.unwrap());
    }
    println!("{:?}", output);
}

Here, a common objection is "but I want them to run in parallel!". However, they actually do run in parallel. The tokio::spawn call ensures that.

There's also the JoinSet type which is being stabilized in the next Tokio release. However, it does not keep the outputs in order.

3 Likes

Wouldn't await-ing them in a for loop introduce overhead? Somehow petty of me, admittedly, just trying to understand the tradeoffs of all approaches. To me join_all simply sounds like the most efficient way of await-ing several futures; that could be a misguided assumption however, I admit.

You are right about the errors. My problem with that was is that I wasn't sure what type of error would I get in the Result so I kind of skipped over that for the moment -- guess I have more reading to do.

My suggestion is the lowest overhead out of all of the ones mentioned here. Using join_all is more overhead because it has to poll everything in the vector at the same time, whereas my loop only polls one at the time.

1 Like

In fact, in most cases where using join_all is an option, it is the highest-overhead option.

1 Like

I am willing to trust you but I just ran several times both my first code and yours -- and yours always seems to take 1-2ms more.

Compared to what?

In general, any benchmark that involves a sleep is completely flawed. The overheads we are talking about are nowhere near a millisecond large.

1 Like

Here's mine:

    let tasks: FuturesUnordered<_> = (1..=5).map(|x| tokio::spawn(sleep_worker(x))).collect();
    let results: Vec<Result<u64, _>> = futures::future::join_all(tasks).await;
    println!("{:?}", results);

Here's yours:

    let mut handles = Vec::new();
    for i in 1..=5 {
        handles.push(tokio::spawn(sleep_worker(i)));
    }

    let mut output = Vec::new();
    for handle in handles {
        output.push(handle.await.unwrap());
    }
    println!("{:?}", output);

Ran both several times, mine takes 101.0 - 101.5 ms and yours takes 102.5 - 103.5ms.

Hardly a scientific benchmark, surely, but wanted to point out that a preliminary observation doesn't seem to support that just for looping over handles is more performant.

True. My homework now will be to have your code and mine and test them in real conditions with tracing + telemetry and all other shebangs.

Uhh, you can't pass a FuturesUnordered to join_all, so that can't possibly be what you are comparing to.

Now you're confusing me because that code runs just fine -- also check the playground, it's all there. I can post crate versions if that helps.

Oh my god, FuturesUnordered implements IntoIterator, which just disassembles it and gives back the futures. This lets you pass it to join_all, as it takes any IntoIterator.

The FuturesUnordered isn't even being used to execute the futures at all in your snippet. It's literally just used as a Vec, except it stores them in a linked list.

The fact that the futures crate allows this in the first place is just silly.

2 Likes

I admit I used FuturesUnordered only because I literally couldn't make a code that even compiles that coerces a list of futures into a collection that join_all would accept -- and I mean in one go and involving .map(...) and .collect(), not with looping.

Guess my knowledge of the Rust traits is not perfect. Any suggestions how do I create a list of futures in one go and not via .push()-ing? I am curious just for the exercise.

If you want to pass it to join_all, then a Vec should do the trick.

Well I feel stupid because I literally can't figure out how to even produce a vector of futures given, say, a range like (1..=5) -- again, by using map and whatever else is needed after -- collect, into_iter, into etc. I keep getting errors having to know the size of each element or that the async fn type must be known ahead of time. I got confused and gave up and I couldn't make a compile-able code even now.

It sounds like you're trying to explicitly write down the type of the future?

This should work:

let tasks: Vec<_> = (1..=5).map(|x| tokio::spawn(sleep_worker(x))).collect();
1 Like

Yeah it does, thanks. Indeed I was trying to do stuff like Vec<Future<Output=u64>> but that didn't work.

Thank you.

Right. It's important to realize that Future is not a type. It's a trait. You cannot use it as-if it was a type and expect it to work.

2 Likes

It's also quite important not to dive in Rust async until I finalized the last chapters of the Rust book but there we go. :grin:

Still going to mark your answer as the solution since you offered a more intuitive alternative, plus to help with discoverability in the future.

Thanks for your help!

You're welcome.