Cancelling tokio threads and

Hi folks,

It seems I am missing something basic here. I have 2 tasks in my worker, both of which need to run - or the whole worker in which we are should restart.
I use cancellation tokens - the server can receive Cancel command, in which case I want to shutdown the worker so the main loop will start it again.
The problem is that when I simulate an early exit in update_task, then server_task doesn't end - as I would expect (because select! should 'cancel the remaining futures' and/or the server task is spawned with with_cancellation_token_owned). So on the next iteration of the main loop it complains instead that the address is in use.
Both methods I spawn in tokio_task_tracker (tokio_util::task::TaskTracker) use methods that aren't cancel-safe. As loosing data isn't critical at this point, this shouldn't be important, right?

server_task and update_task both return:

tokio::spawn(task_closure)
let update_task = tokio_task_tracker.spawn(
    update_task.update_task()?);


let _server_task = tokio_task_tracker.spawn(
    server.server_task(cancellation_token.clone()).with_cancellation_token_owned(cancellation_token.clone())
);

let _a = tokio::select!(
    _a = _server_task => {
        warn!("main tokio::select: server task ended");
    },
    _x = udpate_task => {
        warn!("main tokio::select: update task ended");
    },
    // _b = cancellation_token.cancelled() => { // is this needed
    //     warn!("main tokio::select: cancel token cancelled");
    // }
);

// in case the server task ends
cancellation_token.cancel();
tokio_task_tracker.close();
match timeout(
    Duration::from_secs(3),
    // future
    tokio_task_tracker.wait(),
).await{
    Ok(_a) => { },
    Err(_e) => {
        bail!("timeout while waiting the cancelled tasks to finish")
    },
};
sleep(Duration::from_millis(100)).await;
Ok(())

I know I have to pay attention to all the threads I spawn and await or process their handles. I have currently only the 4 spawns in my program I mentioned.

Bonus question:
As I am new to async programming and it's still possible I missed something, what strategy would you recommend for 'hard resetting' from a stuck / deadlock state? Some kind of a watchdog (crates.io: Rust Package Registry ?) and put everything in a std thread I terminate after a timeout?

If you want to use CancellationToken, then you need to pass the CancellationToken down into each spawned task, and have that task either check the token or use run_until_cancelled on its work.

This older thread covers some other patterns of cancelling multiple tasks: Tokio: how to abort one task and all its subtasks

1 Like

Indeed my problem was the nested spawn. The outer spawn was cancelled, but the inner thread continued to operate.
Thanks Kevin!