On a high level, the problem I'm solving is to create a stress-test client utilizing quic protocol. In quic, you can use many (hundreds) streams per connection to send you packets.
I'm trying to introduce a function that receives a batch of packets and sends these packets at least concurrently (since I'm not sure if doing it in parallel makes sense). For that, I first started with join_all! but read in the documentation that if there are many streams, it is better to use FuturesUnordered:
// this is in async function
let data_batch: Vec<Vec<u8>> = ...;
let mut futures = FuturesUnordered::new();
for data in data_batch.into_iter() {
// send_data doesn't spawn inside
futures.push( async move { send_data(&connection, &data).await });
}
while let Some(res) = futures.next().await { // handle errors }
Questions:
does this code lead to concurrent execution of the futures which I put to the FuturesUnordered? Or do I need to spawn a task when doing push?
Is it necessary to pull the futures to ensure that they are executed? For example, if I don't really care about the stream errors and would like to maximize the transactions per second, do I still need to pull the futures?
Every time you poll FuturesUnordered to try and get the next result (e.g. your futures.next().await call), it will internally poll each of the contained futures that could make progress in turn, until it's either out of futures that could make progress, or it gets a result to return.
This thus results in concurrency, since as each future hits a point where it's awaiting I/O completion, FuturesUnordered will move on to polling the next future. If you also want CPU concurrency, you'll need to spawn futures.
Note that there's a footgun here if // handle errors takes significant wall-clock time, since the futures inside FuturesUnordered will be part-way through their work, and will not be polled until the next time you get to the top of the while let loop. You can avoid the footgun by spawning futures and just pushing JoinHandles into the FuturesUnordered, which will cause the futures to run independently; if you're using Tokio, the JoinSet abstraction does this for you.
From your answer, my understanding is that I really must poll futures to have the concurrency. Handling errors doesn't take a long time in my case so I'm fine with it.
Is this feature documented somewhere? In the doc for push it just states This method adds the given future to the set. This method will not call poll on the submitted future. ....
They aren't really the same thing. JoinSet is like FuturesUnordered<JoinHandle>, so if you're already making FuturesUnordered<JoinHandle>, then JoinSet is better since it has less overhead and is integrated into tokio. It has a few more methods, but the most useful ones are the same or can be remade externally. A lot of FuturesUnordered's API is in StreamExt.
If you aren't spawning your futures, then FuturesUnordered has some advantages:
Monomorphized for the exact future you are dealing with, instead of boxing them as trait objects
Less overhead
Doesn't require 'static (or Send, but usually you will need that anyway if it's being run on tokio)
The types such as FuturesUnordered that run many tasks without spawning real runtime tasks have a bunch of footguns that make them hard to use. See Barbara battles buffered streams and this bug. The JoinSet type spawns real runtime tasks, so those issues go away.