Tokio: how to abort one task and all its subtasks

My question is same as follows:

#[tokio::main]
async fn main() {
    let h = tokio::spawn(async move {
        let hh = tokio::spawn(async move {
            for i in 1..=10 {
                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                println!("{}", i);
            }
            println!("loop finished");
        });
        hh.await;
    });
    println!("sleep(5)");
    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
    println!("abort");
    h.abort();
    println!("abort done");
    println!("sleep(10)");
    tokio::time::sleep(std::time::Duration::from_secs(10)).await;
    println!("sleep(10) finished");
}

calling abort on handler h only cancel itself, but its subtasks still running.

sleep(5)
1
2
3
4
abort
abort done
sleep(10)
5
6
7
8
9
10
loop finished
sleep(10) finished

In Python 3.9.6 and asyncio, cancel a task can cancel all its subtasks.

import asyncio

async def subtask():
    for i in range(1,11):
        await asyncio.sleep(1)
        print(i)

async def task():
    hh = asyncio.create_task(subtask())
    await hh

async def main():
    h = asyncio.create_task(task())
    print("sleep(5)")
    await asyncio.sleep(5)
    print("abort")
    h.cancel()
    print("abort done")

    print("sleep(10)")
    await asyncio.sleep(10)
    print("sleep(10) finished")

asyncio.run(main())

Here is output:

sleep(5)
1
2
3
4
abort
abort done
sleep(10)
sleep(10) finished

After searching for solutions, I came across this

And try this:

use tokio_util::task::TaskTracker;
#[tokio::main]
async fn main() {
        let tracker = TaskTracker::new();
        let tracker_clone = tracker.clone();

        let h = tracker.spawn(async move {
            let hh = tracker_clone.spawn(async move {
                for i in 1..=10 {
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                    println!("{}", i);
                }
                println!("loop finished");
            });
            let _ = hh.await;
        });

        println!("sleep(5)");
        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
        println!("abort");
        tracker.close();  // is it same as abort?
        println!("abort done");
        println!("sleep(10)");
        tokio::time::sleep(std::time::Duration::from_secs(10)).await;
        println!("sleep(10) finished");

        // Wait for everything to finish.
        tracker.wait().await;
}

I don't know if I misunderstood, but it still can't work.

sleep(5)
1
2
3
4
abort
abort done
sleep(10)
5
6
7
8
9
10
loop finished
sleep(10) finished

By the way, the macro tokio::select! still can not cancel subtaks:

#[tokio::main]
async fn main() {
        let h = tokio::spawn(async move {
            let hh = tokio::spawn(async move {
                for i in 1..=10 {
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                    println!("{}", i);
                }
                println!("loop finished");
            });
            let _ = hh.await;
        });
        let t = tokio::time::sleep(tokio::time::Duration::new(5, 0));
        tokio::select! {
            _ = h => {
                println!("h finish first");
            },
            _ = t => {
                println!("t finish first");
            },
        }

        println!("sleep(10)");
        tokio::time::sleep(std::time::Duration::from_secs(10)).await;
        println!("sleep(10) finished");
}

Can anyone tell me how to achieve an effect similar to that in Python, where canceling a task will also cancel its subtasks

there's no hierachical task structure builtin, "subtask" is not a concept the tokio runtime is aware of. and the JoinHandle type will NOT cancel the task when dropped, that's intentional.

you can use tokio_util::CancellationToken if your tasks can collaboratively work with the cancel event.

otherwise, if you want "forced" cancellation, you must do extra work on top of the tokio runtime. one example is to create a wrapper type for the JoinHandle, which cancels unfinished tasks on drop. (note, there's a type named AbortHandle in tokio, but it does something different, it will NOT drop the associated task).

I also want to mention this tokio_scoped crate which emulate structured concurrency with an API similar to std::thread::scope(). I think your goal can be achieved using this "scoped" API, you can give it a try.

useful links (you can search "structured concurrency tokio" to find more information on this topic):

Thank you for letting me know that creating tasks is independent and nested tasks do not maintain a hierarchical structure.

#[tokio::main]
async fn main() {
        use tokio_util::sync::CancellationToken;

        let cancel_token = CancellationToken::new();
        let token = cancel_token.clone();

        let h = tokio::spawn(async move {
            let hh = tokio::spawn({
                let token = token.clone();
                async move {
                    for i in 1..=10 {
                        if token.is_cancelled() {
                            println!("h cancelled");
                            return;
                        }
                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                        println!("{}", i);
                    }
                    println!("loop finished");
                }
            });

            let _ = hh.await;
        });

        let t = tokio::time::sleep(tokio::time::Duration::new(5, 0));
        tokio::select! {
            _ = h => {
                println!("h finish first");
            },
            _ = t => {
                println!("t finish first");
                cancel_token.cancel();
            },
        }

        println!("sleep(10)");
        tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
        println!("sleep(10) finished");
}

This is useful, but manual handling cancel event across various tasks is a bit troublesome, especially for the invasion of existing code

Another option is for the tasks to spawn all their subtasks in a JoinSet. Then, when such a task is aborted & dropped, the subtasks will be aborted as well.

1 Like

Thanks, :wink:. Under your suggestion, I found this thread is helpful.

1 Like