This weekend I'm looking a bit more into rust async and I created this sample program.
My actual solution would take 4 futures (max_concurrent) and then after ALL 4 futures are finished it executes the next 4 (like chunking). This is ok for short running tasks or if all tasks need equal'ish time. But I would say in best case this is half async.
Without any limits all possible tasks are started directly. Depending on the client it can be problematic to start 200 simultaneous downloads.
So my problems are:
-
How can I change the code, that the concurrency maximum is always used instead of working in batches. Do I need something like future-queue? Or is it necessary to use streams for this?
-
Does it make sense to use tokio::spawn in this context? From my understanding the program uses only 1 (os) thread at the moment? If possible I would like to configure how many threads/cores are used. I played around with spawn but it executes the task instantly and only the result seems to be available as a future.
My experience with async stuff is like 5 days ...
thx & cheers
Peter
here directly the code for reference:
use futures::future::join_all;
use futures::Future;
use rand::{thread_rng, Rng};
use std::mem;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let ids: Vec<u64> = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let max_concurrent: usize = 4;
let mut futures = Vec::new();
for id in ids {
let future = async move {
let num: u64 = thread_rng().gen_range(10..100);
println!("START id: {} with {}ms", &id, &num);
sleep(Duration::from_millis(num)).await;
println!("STOP id: {}", id);
};
futures.push(future);
}
println!(
"{} futures and max {} parallel executions",
futures.len(),
max_concurrent
);
execute_futures(futures, max_concurrent).await;
}
async fn execute_futures(futures: Vec<impl Future>, max_concurrent: usize) {
let mut running_futures = Vec::new();
for future in futures {
if running_futures.len() == max_concurrent {
let _ = join_all(mem::take(&mut running_futures)).await;
}
running_futures.push(future);
}
// join the rest
join_all(running_futures).await;
}