FuturesUnordered - Add delay between futures execution

How do I add delay between tasks in Unordered futures. Below is the code, it starts all the tasks at same time -

//use futures::stream::FuturesOrdered;
use futures::StreamExt; // for .next()

use std::time::Duration;

use tokio::time::sleep;
use chrono::Utc;
use tokio::time;

use futures::stream::FuturesUnordered;

async fn async_task(id: u32, delay_sec: u64) -> u32 {
let now = Utc::now();
println!("Task {} Current time: {}",id, now.format("%Y-%m-%d %H:%M:%S%.3f"));
sleep(Duration::from_secs(delay_sec)).await;
let completed_time = Utc::now();
println!("Task {} completed after {} seconds at {}", id, delay_sec, completed_time.format("%Y-%m-%d %H:%M:%S%.3f"));
id
}

#[tokio::main]
async fn main() {
// Create a shared, mutable FuturesOrdered wrapped in Arc and Mutex (from tokio)
let mut ft = FuturesUnordered::new();

    for i in 0..5 {
        sleep(Duration::from_secs(2)).await;
        ft.push(async_task(i, 1)); 
    }

while let Some(result) = ft.next().await {
    sleep(Duration::from_secs(2)).await;
    println!("{}", result);
}

}

The FuturesUnordered doesn't start polling its contents until you poll it (via calling ft.next()).

If you want tasks to run while you are starting them slowly, consider using tokio::task::JoinSet instead of FuturesUnordered, which is a way to have a collection of spawned background tasks.

2 Likes

Sorry can you explain what you mean by “ doesn't start polling its contents” do you mean getting the results or executing the task itself ? Like I want to start a task sleep for 100ms then start another ? And I want to get results of these tasks as they complete execution.

do you mean getting the results or executing the task itself ?

Those are the same thing. That's the problem. FuturesUnordered doesn't poll (execute) its contained futures except while you’re polling it for results. If you want a true task, something that is executed even while you're not awaiting or otherwise polling it somehow, then you need to spawn it, not merely put it in a FuturesUnordered.

(It would also be possible to express "poll the FuturesUnordered and simultaneously sleep for 2 seconds”, but that is probably not a particularly good solution unless you have specific requirements.)

1 Like

So for my case where I need to throttle the futures execution but also process responses as they finish what is better alternative ?

Thank you for responding.

You should use JoinSet. It is easier to use correctly. FuturesUnordered is more flexible in some ways, but not ways relevant to your situation.

You could also not use any set at all, but spawn the tasks independently and have them write their results to a channel.

1 Like