Awaiting FuturesUnordered

I have something like this:

#[tokio::main]
async fn main() {
    let mut futures:FuturesUnordered<_> = make_futures();
}

I don't really care about the results of those futures (they will panic if anything goes wrong), I only want to drive them to completion and then exit the app when all are done.

Is the right way to do that to simply loop over them, like this?

while futures.next().await.is_some() { }

I guess this feels wrong to me since it almost looks like it's proceeding through them sequentially... (I can see by the logs that it isn't, so I guess FuturesUnordered is kinda like a wrapper of sorts...)

In addition to basic syntax for driving this collection of futures, do I just leave the batching up to tokio, or do I need to control that somehow? In my case each future opens a file, and makes a few network requests, including uploading that file to a remote server. There can be thousands of Futures - it'd be cool if tokio knows how to manage that to take the best advantage of network bandwidth etc.

Typically the loop is written like this, but in your case they are equivalent.

// using Some(()) to verify that return value is ()
while let Some(()) = futures.next().await { }

And yes, using the loop like this is indeed the intended solution. The FuturesUnordered collection is special, and its .next() function makes progress on all things inside it and returns the first thing to finish.

As for batching, it works equivalently to spawning all of them (except wrt. panics). If you open a connection, Tokio will open the connection. It doesn't try to throttle connections or anything special if that's what you're asking? It just does what you tell it.

1 Like

ok great, thanks!

re: throttling connections... is there a simple way to do that? or some primitives that make it easier?

Haven't thought this through completely, but on one foot - maybe something that has a Stream and a counter and methods to increase/decrease the counter (in a thread-safe way, like Atomic), and the stream only yields while the counter is below a certain value (and I suppose, wakes itself when the count is changed)? Then the connection code can increase/decrease that counter at the await points...

I dunno... something like that... feels like something which might already be out there somewhere...

Just thinking out loud but I guess the not-awful easy solution for my case (just an internal tooling thing) would just be to split my futures into batches of FuturesUnordered and just await on those batches sequentially...

I'm not aware of any easy counter based methods for throttling. One common approach is to have a loop like this:

while let Some(next_job) = jobs.pop() {
    while futures.len() >= max { futures.next().await; }
    futures.push(next_job);
}
while let Some(()) = futures.next().await { }
1 Like

interesting... so FuturesUnordered can be pushed even while it's being processed? That's neat!

Yes. It only works on the futures while inside a call to .next(), so when you are outside of it, you are free to push more items.

1 Like