Thousands of async calls

I have some code working with threads that where multiple devices (thousands) are queried for their values.
(The device protocol is modbus but that's not important, it could be http based)

This works fine but I want to experiment with async.

I would like one device queried in each future.

Is there a pattern I can use that executes all the futures (thousands / tens of thousands)
and waits for them to complete so I can can add each of their results vectors to one large vector ?

For example would code like the following be ok ?

async fn read_ip_port_device(points: Vec<Point>, do_conversion: bool) -> YosemiteResult<Vec<(Point, f64)>> { ... }

let mut futures:  Vec<_>  = vec![];

let mut all_points : Vec<Vec<Point>> = get_points_to_process(campus_name, requested_ip);

for group in all_points.iter() {
    let future = read_ip_port_device(group.clone(), do_conversion);
    futures.push(future);
}

How do I wait for all these futures?

I see there is futures::future::try_join_all
but the docs say

"If any future returns an error then all other futures will be canceled and an error will be returned immediately"

I would expect some to timeout an error and do not wish to stop the others. I would like just to log the failed calls.

Hi Glenn,

I'm no expert in this at all, but a little searching hit upon this thread discussing FuturesUnordered:

The example given seems applicable to you. Maybe this is a good place to start?

1 Like

I also think FuturesUnordered is the way to go. Here's a usage example:

use futures::prelude::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let mut fut = FuturesUnordered::new();
    fut.push(sleep(Duration::from_millis(100)));
    fut.push(sleep(Duration::from_millis(200)));
    fut.push(sleep(Duration::from_millis(800)));

    while let Some(result) = fut.next().await {
        println!("{:?}", result);
    }
}
2 Likes

Ah that is helpful. Although I am using modbus - Rust which will cause those futures to block in If understand the comments in that thread.
I will investigate. May have to use tokio_modbus

You could map the Results to Ok and deal with them later.

https://docs.rs/futures/0.3.14/futures/prelude/future/trait.TryFutureExt.html#method.map_ok_or_else

futures_iter.map(|f| f.map_ok_or_else(Ok, Err))

That wraps everything into a Result, but returns Ok for the future, so it continues.

In case you don't want to immediately start thousands of operations at the same time, and rather have only some active and the rest queued, there's Stream with buffer_unordered.

https://docs.rs/futures/0.3.14/futures/stream/trait.StreamExt.html#method.buffer_unordered

2 Likes