Executing a set of futures concurrently

On a high level, the problem I'm solving is to create a stress-test client utilizing quic protocol. In quic, you can use many (hundreds) streams per connection to send you packets.

I'm trying to introduce a function that receives a batch of packets and sends these packets at least concurrently (since I'm not sure if doing it in parallel makes sense). For that, I first started with join_all! but read in the documentation that if there are many streams, it is better to use FuturesUnordered:

// this is in async function
let data_batch: Vec<Vec<u8>> = ...;
let mut futures = FuturesUnordered::new();
for data in data_batch.into_iter() {
    // send_data doesn't spawn inside
    futures.push( async move { send_data(&connection, &data).await });
}

while let Some(res) = futures.next().await { // handle errors }

Questions:

  1. does this code lead to concurrent execution of the futures which I put to the FuturesUnordered? Or do I need to spawn a task when doing push?
  2. Is it necessary to pull the futures to ensure that they are executed? For example, if I don't really care about the stream errors and would like to maximize the transactions per second, do I still need to pull the futures?
1 Like

Every time you poll FuturesUnordered to try and get the next result (e.g. your futures.next().await call), it will internally poll each of the contained futures that could make progress in turn, until it's either out of futures that could make progress, or it gets a result to return.

This thus results in concurrency, since as each future hits a point where it's awaiting I/O completion, FuturesUnordered will move on to polling the next future. If you also want CPU concurrency, you'll need to spawn futures.

Note that there's a footgun here if // handle errors takes significant wall-clock time, since the futures inside FuturesUnordered will be part-way through their work, and will not be polled until the next time you get to the top of the while let loop. You can avoid the footgun by spawning futures and just pushing JoinHandles into the FuturesUnordered, which will cause the futures to run independently; if you're using Tokio, the JoinSet abstraction does this for you.

4 Likes

From your answer, my understanding is that I really must poll futures to have the concurrency. Handling errors doesn't take a long time in my case so I'm fine with it.

Either that, or spawn a separate task for each future, which will instruct the executor to poll them for you.

3 Likes

Is this feature documented somewhere? In the doc for push it just states This method adds the given future to the set. This method will not call poll on the submitted future. ....

I generally recommend using tokio's JoinSet over FuturesUnordered

6 Likes

I wonder what is the background for this. Do you think that the JoinSet vs FuturesUnordered is:

  • better maintained because it is part of tokio crate
  • has better performance
  • has more features
  • has better methods naming (I agree)

jonhoo (in one of his more recent videos) talks about the FuturesUnordererd. (The section is about JoinSet, but he covers FuturesUnordererd as well).

2 Likes

They aren't really the same thing. JoinSet is like FuturesUnordered<JoinHandle>, so if you're already making FuturesUnordered<JoinHandle>, then JoinSet is better since it has less overhead and is integrated into tokio. It has a few more methods, but the most useful ones are the same or can be remade externally. A lot of FuturesUnordered's API is in StreamExt.

If you aren't spawning your futures, then FuturesUnordered has some advantages:

  • Monomorphized for the exact future you are dealing with, instead of boxing them as trait objects
  • Less overhead
  • Doesn't require 'static (or Send, but usually you will need that anyway if it's being run on tokio)
2 Likes

The types such as FuturesUnordered that run many tasks without spawning real runtime tasks have a bunch of footguns that make them hard to use. See Barbara battles buffered streams and this bug. The JoinSet type spawns real runtime tasks, so those issues go away.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.