Process tokio task results in order completed

I'm re-asking a question I asked here (Processing thread results in order they complete) because I'm still holding out hope that there is a Tokio-specific answer that is better than using channels, but perhaps channels is the best solution.

The following simple code spawns 10 tasks that each sleep for a random amount of time and then return some data. I would like the main thread to print the data from each task in the order that the tasks complete. But as written it prints in the order the tasks were created. I understand why it does this, but is there a better way than using channels to get the result I want? Is this something I should consider? futures::stream::futures_unordered::FuturesUnordered - Rust

use rand::Rng; // trait that must be in scope
use tokio::{
    task,
    time::{sleep, Duration},
};

#[tokio::main(worker_threads = 4)]
async fn main() {
    let mut tasks = Vec::new();
    for i in 0..10 {
        // task::spawn requires a Future and using an async block provides this.
        tasks.push(task::spawn(async move {
            let ms = rand::thread_rng().gen_range(1000..3000);
            sleep(Duration::from_millis(ms)).await;
            (i, ms)
        }));
    }

    for task in tasks {
        let (task_number, sleep_ms) = task.await.unwrap();
        println!("task {} slept for {}ms", task_number, sleep_ms);
    }

    println!("done");
}

I can think of three ways to do this:

  1. Use an mpsc channel.
  2. Put the JoinHandles in a FuturesUnordered.
  3. Use Stream utilities with buffer_unordered.

I would consider all three methods perfectly fine ways of doing it, although the latter two would be more robust against panics in the tasks.

I guess technically there's also this:

use rand::Rng; // trait that must be in scope
use tokio::{
    task,
    time::{sleep, Duration},
};

#[tokio::main(worker_threads = 4)]
async fn main() {
    let mut tasks = Vec::new();
    for i in 0..10 {
        // task::spawn requires a Future and using an async block provides this.
        tasks.push(task::spawn(async move {
            let ms = rand::thread_rng().gen_range(1000..3000);
            sleep(Duration::from_millis(ms)).await;
            println!("task {} slept for {}ms", i, ms);
        }));
    }

    for task in tasks {
        task.await.unwrap();
    }

    println!("done");
}
1 Like

Here are some examples:

  1. example using FuturesUnordered
  2. example using buffer_unordered

It's worth noting that buffer_unordered uses FuturesUnordered internally.

1 Like

For future reference by others, here is a solution that works for me using FuturesUnordered. Feedback is welcomed if there is anything that can be improved.

// The StreamExt and Rng traits must be in scope.
use futures::stream::{FuturesUnordered, StreamExt};
use rand::Rng;
use tokio::{
    task,
    time::{sleep, Duration},
};

#[tokio::main(worker_threads = 4)]
async fn main() {
    let mut futures = FuturesUnordered::new();
    for i in 0..10 {
        futures.push(task::spawn(async move {
            let ms = rand::thread_rng().gen_range(1000..3000);
            sleep(Duration::from_millis(ms)).await;
            (i, ms)
        }));
    }

    loop {
        match futures.next().await {
            Some(result) => {
                let (task_number, sleep_ms) = result.unwrap();
                println!("task {} slept for {}ms", task_number, sleep_ms);
            }
            None => {
                break;
            }
        }
    }

    println!("done");
}

You can replace the loop/match with a while loop. See my example above.

1 Like

Another option would be to chain the operations onto the end of the futures themselves with something like this:

async fn chain<Fut, Func, Out>(fut:Fut, f:Func) -> Out
where Fut: Future, Func: FnOnce(Fut::Output)->Out {
    f(fut.await)
} 

I haven't done much async programming, so there may be something wrong with the bounds here. There might also be a standard implementation somewhere.

@Alice Thanks so much for your help on this! I have all three approaches implemented here: https://github.com/mvolkmann/rust-tokio

It seems to me that the channel approach might be the best from a performance perspective. It probably doesn't matter with only 10 tasks. But if I had 1000 tasks, would there potentially be an issue because these lines would continually poll all the tasks to see if they have completed?

    while let Some(result) = futures.next().await { // from main-FuturesUnordered.rs
    while let Some(result) = stream.next().await { // from main-stream.rs

Actually, with FuturesUnordered in particular, that is not a problem, as described in its documentation:

This structure is optimized to manage a large number of futures. Futures managed by FuturesUnordered will only be polled when they generate wake-up notifications. This reduces the required amount of work needed to poll large numbers of futures.

But it's good to be aware of that issue, because other primitives such as join_all do suffer from it.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.