How to 'time' FuturesUnordered Tasks

Solution Example:

use std::future::Future;
use std::pin::Pin;
use std::time::Duration;

use futures::stream::FuturesUnordered;
use futures::StreamExt;

use tokio::time::{sleep, timeout};

type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
type DynFuture = Pin<Box<dyn Future<Output = Result<(), DynError>>>>;

async fn task_one() -> Result<(), DynError> {
    println!("Task 1");
    sleep(Duration::from_secs(5)).await;
    println!("Task 1 Done");
    Ok(())
}

async fn task_two() -> Result<(), DynError> {
    println!("Task 2");
    sleep(Duration::from_secs(10)).await;
    println!("Task 2 Done");
    Ok(())
}

async fn task_three() -> Result<(), DynError> {
    println!("Task 3");
    sleep(Duration::from_secs(2)).await;
    println!("Task 3 Done");
    Ok(())
}

fn add_job(jobs: &mut FuturesUnordered<DynFuture>, task: impl Future<Output = Result<(), DynError>> + Send + 'static, timeout_duration: Duration) {
    jobs.push(Box::pin(async move {
        let result = timeout(timeout_duration, task).await;
        match result {
            Ok(result) => result,
            Err(timeout_error) => Err(Box::new(timeout_error) as DynError),
        }
    }));
}

#[tokio::main]
async fn main() -> Result<(), DynError> {
    println!("Begin.");

    let mut jobs = FuturesUnordered::new();

    add_job(&mut jobs, task_one(), Duration::from_secs(5));
    add_job(&mut jobs, task_two(), Duration::from_secs(5));
    add_job(&mut jobs, task_three(), Duration::from_secs(5));

    while let Some(result) = jobs.next().await {
        match result {
            Ok(_) => (),
            Err(error) => {
                println!("Error with task: {:?}", error);
            },
        }
    }

    Ok(())
}

Question:

I have this simple threaded program that runs 3 tasks using FuturesUnordered.

use std::time::Duration;
use tokio::time::sleep;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::pin::Pin;
use std::future::Future;

async fn task_one() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    println!("Task 1");
    sleep(Duration::from_secs(5)).await;
    println!("Task 1 Done");
    Ok(())
}
async fn task_two() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    println!("Task 2");
    sleep(Duration::from_secs(10)).await;
    println!("Task 2 Done");
    Ok(())
}
async fn task_three() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
    println!("Task 3");
    sleep(Duration::from_secs(2)).await;
    println!("Task 3 Done");
    Ok(())
}


#[tokio::main]
async fn main() {
    println!("Begin.");
    let mut jobs = FuturesUnordered::<Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>>>>>::new();
    jobs.push(Box::pin(task_one()));
    jobs.push(Box::pin(task_two()));
    jobs.push(Box::pin(task_three()));
     
    while let Some(result) = jobs.next().await {
        match result {
            Ok(_) => (),
            Err(_) => {
                println!("Error with task.");
            }
        }
    }
}

The requirement I need to do is to return errors when a given task takes longer than X amount of time, lets just say 6 seconds for now. So my desired output would be just seeing Task 1 and Task 2 Done but not Task 3.

I guess, I'm also curious, is there a better way to do this besides using FuturesUnordered? I'm not sure if anything has improved since the last time I tried solving this threading issue.

How is this to be done with FuturesUnordered? I haven't touched Rust in a while, so any type of ideas/tips would be highly appreciated. Love the Rust Community :crab:!!

In Tokio you can use tokio::time::timeout to add a timeout to each task.

let max_time = Duration::from_secs(6);
jobs.push(Box::pin(timeout(max_time, task_one())));

Then the value you get out of the FuturesUnordered will have two nested Results — one for task failure and one for timeout.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.