I'm re-asking a question I asked here (Processing thread results in order they complete) because I'm still holding out hope that there is a Tokio-specific answer that is better than using channels, but perhaps channels is the best solution.
The following simple code spawns 10 tasks that each sleep for a random amount of time and then return some data. I would like the main thread to print the data from each task in the order that the tasks complete. But as written it prints in the order the tasks were created. I understand why it does this, but is there a better way than using channels to get the result I want? Is this something I should consider? futures::stream::futures_unordered::FuturesUnordered - Rust
use rand::Rng; // trait that must be in scope
use tokio::{
task,
time::{sleep, Duration},
};
#[tokio::main(worker_threads = 4)]
async fn main() {
let mut tasks = Vec::new();
for i in 0..10 {
// task::spawn requires a Future and using an async block provides this.
tasks.push(task::spawn(async move {
let ms = rand::thread_rng().gen_range(1000..3000);
sleep(Duration::from_millis(ms)).await;
(i, ms)
}));
}
for task in tasks {
let (task_number, sleep_ms) = task.await.unwrap();
println!("task {} slept for {}ms", task_number, sleep_ms);
}
println!("done");
}
use rand::Rng; // trait that must be in scope
use tokio::{
task,
time::{sleep, Duration},
};
#[tokio::main(worker_threads = 4)]
async fn main() {
let mut tasks = Vec::new();
for i in 0..10 {
// task::spawn requires a Future and using an async block provides this.
tasks.push(task::spawn(async move {
let ms = rand::thread_rng().gen_range(1000..3000);
sleep(Duration::from_millis(ms)).await;
println!("task {} slept for {}ms", i, ms);
}));
}
for task in tasks {
task.await.unwrap();
}
println!("done");
}
For future reference by others, here is a solution that works for me using FuturesUnordered. Feedback is welcomed if there is anything that can be improved.
// The StreamExt and Rng traits must be in scope.
use futures::stream::{FuturesUnordered, StreamExt};
use rand::Rng;
use tokio::{
task,
time::{sleep, Duration},
};
#[tokio::main(worker_threads = 4)]
async fn main() {
let mut futures = FuturesUnordered::new();
for i in 0..10 {
futures.push(task::spawn(async move {
let ms = rand::thread_rng().gen_range(1000..3000);
sleep(Duration::from_millis(ms)).await;
(i, ms)
}));
}
loop {
match futures.next().await {
Some(result) => {
let (task_number, sleep_ms) = result.unwrap();
println!("task {} slept for {}ms", task_number, sleep_ms);
}
None => {
break;
}
}
}
println!("done");
}
It seems to me that the channel approach might be the best from a performance perspective. It probably doesn't matter with only 10 tasks. But if I had 1000 tasks, would there potentially be an issue because these lines would continually poll all the tasks to see if they have completed?
while let Some(result) = futures.next().await { // from main-FuturesUnordered.rs
while let Some(result) = stream.next().await { // from main-stream.rs
Actually, with FuturesUnordered in particular, that is not a problem, as described in its documentation:
This structure is optimized to manage a large number of futures. Futures managed by FuturesUnordered will only be polled when they generate wake-up notifications. This reduces the required amount of work needed to poll large numbers of futures.
But it's good to be aware of that issue, because other primitives such as join_all do suffer from it.