Running Tasks in parallel

Using tokio I have a number of tasks spawned using:
let handle = tokio::spawn(...some future...);

What is the best way to run these tasks in parallel and process the result of each future as it is ready?
I have tried a couple of options:

  1. Using futures::future::select_all:
let mut threads = vec![handle1, handle2, handle3....];
while !threads.is_empty() {
   let (result, _index, remaining_threads) = select_all!(threads);
   let new_threads = handle_result(result);
   remaining_threads.extend(new_threads);
   threads = remaining_threads
}
  1. Using UnorderedFutures
let mut threads = UnorderedFutures::new();
threads.push(some_task_thread);
while let Some(result) = threads.next().await {
   let new_threads = handle_result(result);
   threads.extend(new_threads);
}

Is there a better way to do this?

There are two good options:

  • FuturesUnordered
  • tokio::sync::mpsc

I'm guessing your second option is just a typo of FuturesUnordered? The way you do it with tokio::sync::mpsc is as follows:

let (send, recv) = mpsc::unbounded_channel();
for job in jobs {
    let send_clone = send.clone();
    tokio::spawn(async move {
        let answer = process_job(job);
        let _ = send_clone.send(answer);
    });
}

drop(send);
while let Some(answer) = recv.recv().await {
    println!("got answer! {:?}", answer);
}

Here you have each task send the answer on a message passing channel. The recv call returns None when every clone of the sender is dropped, which is why it calls drop(send) before the loop.

The select_all combinator is incredibly inefficient. Don't use it.

1 Like

Thank you so much. And yes I meant FuturesUnordered :slight_smile:
In my case, I believe FuturesUnordered would be a better fit since once I get an answer, I'd like to spawn new task(s) to process it which is added to the list of futures already running.`

Using FuturesUnordered for that is a good choice. :+1:

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.