Capture task early returns and exit returns, with tokio

I'm trying to write an application using Tokio. The idea is to spawn to tasks that return anyhow::Result<>. The tasks should be stopped by a cancellation token if the user hits Ctrl+C.
A task returning early should result in the same behavior as if the user pressed Ctrl+C, with the addition of printing the task return value (A task shutting down early is suspicious, even if it returned Ok()).

At exit, when all tasks have been shut down, I also want to print any tasks returning Err().

The below code is basically what I have so far. Unfortunately, the TaskTracker swallows any errors returned at exit.

How should I structure this? I feel like there should be a standard way.

Thanks!

    let ctrl_c = tokio::signal::ctrl_c();
    let mut sig_term = tokio::signal::unix::signal(SignalKind::terminate())?;

    let cancel_token = CancellationToken::new();
    let tracker = TaskTracker::new();

    let prober: JoinHandle<anyhow::Result<()>> = tracker.spawn(
        prober(cancel_token.clone())
        .instrument(info_span!("watcher_task")),
    );

    let cancel_clone = cancel_token.clone();
    let signal_task = tracker.spawn(
        signal_main(&config.signal, cancel_clone, song_update_rx)
            .instrument(info_span!("signal_task")),
    );

    select! {
        res = prober => {
            error!("Watcher task early exit: {res:?}")
        },
        res = signal_task => {
            error!("Signal task early exit: {res:?}")
        }
        _ = ctrl_c => {
            info!("Received Ctrl+C (SIGINT)");
        },
        _ = sig_term.recv() => {
            info!("Received SIGTERM");
        },
    }
    info!("Waiting for tasks to exit");
    cancel_token.cancel();
    tracker.wait().await;
    // Waiting without using the tracker, results in panic due to having used the futures multiple times
    // if let Err(e) = prober.await? {
    //     error!("Prober exit error: {e}");
    // }
    // if let Err(e) = signal_task.await? {
    //     error!("Signal exit error: {e}");
    // }

Yet there's no logic for that taking place in your actual code there. You don't want to select! on both termination signals and the tasks you're await'ing for if you need to Err check them either:

Waits on multiple concurrent branches, returning when the first branch completes, cancelling the remaining branches. - select in tokio - Rust

You might have been thinking of something along the lines of:

let cancel_token = CancellationToken::new();
let ctrl_c = tokio::signal::ctrl_c();
let sig_term = {
    let term = SignalKind::terminate();
    tokio::signal::unix::signal(term)?
};

// make sure to `select!` internally inside each one
// against the `cancel_token` itself getting `cancel`led

let prober_task = prober(cancel_token.clone())
    .instrument(info_span!("watcher_task"));

let signal_task = signal_main(&config.signal, cancel_token.clone(), song_update_rx)
    .instrument(info_span!("signal_task"));

tokio::spawn(async move {
    select!{
        _ = ctrl_c => {
            info!("Received Ctrl+C (SIGINT)");
            cancel_token.cancel();
        },
        _ = sig_term.recv() => {
            info!("Received SIGTERM");
            cancel_token.cancel();
        },
    }
});

let (prober, signal) = tokio::join!(prober_task, signal_task);

if let Err(e) = prober {
    error!("Prober exit error: {e}");
}
if let Err(e) = signal {
    error!("Signal exit error: {e}");
}

Thanks! I never thought of spawning the signal handlers to a separate task, interesting! And I had totally missed the cancelling part of select!.

I put in a ? in the final if statements, to unwrap the JoinHandle:
if let Err(e) = prober? {

What I want to add is that the program should stop all tasks if one of them returns. So the error is printed directly and then the program exits, but gracefully.

I tried out tokio::try_join!, so that I get the error as soon as it happens. It still leaves me with having to join the rest of the tasks (ok, it is one right now, but I expect more :slight_smile: ). My adjusted code below, showing this. Prober is hardcoded to fail and the remaining tasks list hardcoded as well. [1]

I have started to think that maybe it is easier for each task to just return () and have the top function in each task print/log any returned Err within the task and then call cancel_token.cancel(). Then the join in main be simple - just wait for all the tasks. [2]

[1]

use tokio::{join, select, signal::unix::SignalKind};
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let cancel_token = CancellationToken::new();
    let ctrl_c = tokio::signal::ctrl_c();
    let mut sig_term = {
        let term = SignalKind::terminate();
        tokio::signal::unix::signal(term)?
    };

    let mut prober_task = Box::pin(prober(cancel_token.clone()));
    let mut signal_task = Box::pin(signal_main(cancel_token.clone()));

    let cancel_clone = cancel_token.clone();
    tokio::spawn(async move {
        select! {
            _ = ctrl_c => {
                println!("Received Ctrl+C (SIGINT)");
                cancel_clone.cancel();
            },
            _ = sig_term.recv() => {
                println!("Received SIGTERM");
                cancel_clone.cancel();
            },
        }
    });

    let join_res = tokio::try_join!(&mut prober_task, &mut signal_task);
    dbg!(&join_res);

    cancel_token.cancel();

    // Filter out the remaining tasks and wait for them to exit
    let (signal_res, ) = join!(&mut signal_task);
    if let Err(e) = signal_res {
        println!("Error in signal task: {e}");
    }

    Ok(())
}

async fn prober(cancel_token: CancellationToken) -> anyhow::Result<()> {
    anyhow::bail!("Probe fail");

    loop {
        select! {
            _ = cancel_token.cancelled() => {
                println!("Prober task cancelled");
                return Ok(());
            }
        }
    }
}

async fn signal_main(cancel_token: CancellationToken) -> anyhow::Result<()> {
    loop {
        select! {
            _ = cancel_token.cancelled() => {
                println!("Signal task cancelled");
                return Ok(());
            }
        }
    }
}

[2]

async fn signal_main(cancel_token: CancellationToken) {
    if let Err(e) = signal_run(cancel_token.clone()) {
        error!("Signal task exited with error: {e}");
        cancel_token.cancel();
    }
}

async fn signal_run() {
    loop {
        select! {
            _ = cancel_token.cancelled() => {
                println!("Signal task cancelled");
                return Ok(());
            }
        }
    }
}

So I think I'm fairly happy with this solution, using JoinSet. I even have the ID of the early exit task :slight_smile:
The only thing I miss is having the ID turned into a name.

Rust Playground

use tokio::{select, signal::unix::SignalKind, task::JoinSet};
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let cancel_token = CancellationToken::new();
    let ctrl_c = tokio::signal::ctrl_c();
    let mut sig_term = {
        let term = SignalKind::terminate();
        tokio::signal::unix::signal(term)?
    };

    let mut tasks = JoinSet::new();

    tasks.spawn(prober(cancel_token.clone()));
    tasks.spawn(signal_main(cancel_token.clone()));

    select! {
        Some(res) = tasks.join_next_with_id() => {
            println!("Task early exit");
            dbg!(res);
        }
        _ = ctrl_c => {
            println!("Received Ctrl+C (SIGINT)");
        },
        _ = sig_term.recv() => {
            println!("Received SIGTERM");
        },
    }

    println!("Shutting down");
    cancel_token.cancel();

    while let Some(res) = tasks.join_next_with_id().await {
        // ? to strip join result
        if let (id, Err(e)) = res? {
            println!("Task error: {id} {e}");
        }
    }
    Ok(())
}

async fn prober(cancel_token: CancellationToken) -> anyhow::Result<()> {
    anyhow::bail!("Probe fail");

    loop {
        select! {
            _ = cancel_token.cancelled() => {
                println!("Prober task cancelled");
                return Ok(());
            }
        }
    }
}

async fn signal_main(cancel_token: CancellationToken) -> anyhow::Result<()> {
    loop {
        select! {
            _ = cancel_token.cancelled() => {
                println!("Signal task cancelled");
                return Ok(());
            }
        }
    }
}