Batch execution of futures in the tokio runtime (or max number of active futures at a time)

It is also worth being aware of FuturesUnordered, which is how buffer_unordered is internally implemented. It is more powerful than buffer_unordered, but a bit more cumbersome to use.

use futures::future::StreamExt;

let urls = vec![...];
let mut futs = FuturesUnordered::new();

for url in urls {
    futs.push(fetch_url_with_reqwest(url));
    
    // If full, wait for one to finish.
    if futs.len() >= 10 {
        let response = futs.next().await.unwrap();
        // handle response
    }
}

// Wait for the remaining to finish.
while let Some(response) = futs.next().await {
    // handle response
}
async fn fetch_url_with_reqwest(url: UrlType) -> ResponseType {
    ...
}

The example above is 100% equivalent to the one using buffer_unordered.

If you need to insert multiple different futures, you might have the compiler start complaining. In that case you can use Either or FutureExt::boxed on the futures to make them into a single type.

3 Likes