Tokio does not terminate all tasks immediately on program exit

Program:

use std::sync::Arc;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
    let count = Arc::new(Mutex::new(0));

    for i in 0..5 {
        let my_count = Arc::clone(&count);
        tokio::spawn(async move {
            for j in 0..10 {
                let mut lock = my_count.lock().await;
                *lock += 1;
                println!("{} {} {}", i, j, lock);
            }
        });
    }

    loop {
        let n = *count.lock().await;
        if n >= 30 {
            println!("Count hit {}.", n);
            break;
        }
    }
}

Output:

...
4 5 31
0 6 32
Count hit 32.
3 6 33
2 6 34

This program is counting up to 50 but main decides to stop when the count reaches 30 or greater. It's OK if main sees any of 30, 31, 32... BUT

Why would this program output more lines after Count hit ...? When main breaks out of loop, shouldn't the program end itself and cancel all the tasks? How can I ensure no more outputs from other tasks after main has decided the program shall terminate?

Spawning a task returns a JoinHandle for that task to the caller. If that handle is dropped, then, per the docs:

A JoinHandle detaches the associated task when it is dropped, which means that there is no longer any handle to the task, and no way to join on it.

[…]

If a JoinHandle is dropped, then the task continues running in the background and its return value is lost.

You can also use the returned handles to abort tasks, though it's generally better design to arrange for tasks to receive a shutdown signal and stop themselves, for example using a broadcast channel to allow main to signal the tasks to terminate.

1 Like

Ending the program takes some time, it doesn't terminate immediately after main returns, and any task spawned on other threads would still run.

This seems to imply the (multi-thread) scheduler does not wait for spawned tasks and instead cancels them when the primary task is done. But I also tested with the current-thread scheduler, in which case main always outputs Count hit 50. as if the scheduler is waiting for all tasks to be done before the program exits. Is this just a discrepancy between multi-thread and single-thread schedulers? Can I get detailed explanations of program behaviors with both schedulers?

abort does not eliminate the problem when using multi-thread scheduler. I guess when tasks are run in different threads there is still race condition between the abort call and the target thread execution.

When using current-thread scheduler, the output is always Count hit 50, another confusing output.

async fn async_main() {
    let count = Arc::new(Mutex::new(0));
    let mut tasks = vec![];

    for i in 0..5 {
        let my_count = Arc::clone(&count);
        let task = tokio::spawn(async move {
            for j in 0..10 {
                let mut lock = my_count.lock().await;
                *lock += 1;
                println!("{} {} {}", i, j, lock);
            }
        });
        tasks.push(task);
    }

    loop {
        let n = *count.lock().await;
        if n >= 30 {
            println!("Count hit {}.", n);
            for task in tasks {
                task.abort();
            }
            break;
        }
    }
}

fn main() {
    let rt = runtime::Builder::new_current_thread().build().unwrap();
    rt.block_on(async_main());
}

When using the current-thread executor, the only way for the other task to run is when main awaits. This means it can't do anything in between the lock and the program ending, since there are no awaits in that region.

One thing you can do to make this work like you were expecting is to never unlock the mutex.

loop {
    let n = count.lock().await;
    if *n >= 30 {
        println!("Count hit {}.", n);
        std::mem::forget(n);
        break;
    }
}

This isn't well-suited to a mutex, though.

async in rust are cooperative multitasks, cancel in a cooperative way is very different to a pre-emptive scheduler. the runtime cannot "cancel" a Future when it's being poll-ed, it can only make a decision at await-ing points. in you example code, the async tasks are too short and by the time main finishes, there's a good chance all of the spawned tasks are either finished or already scheduled on the worker threads.

if you turn down the number of worker threads, or you adjust the async tasks to do some more work, you are more likely to observe what you expected

note: your example code execution order is indeterministic for multi-threaded scheduler, you might get different result from run to run, so you can run it multiple times to see the difference.

in the following playground, I changed the worker thread number to 2 and increase the loop count of the spawned task to 40:

Using larger numbers looks helpful with understanding the problem as they give more possible outputs. From what I have seen so far one task cannot terminate another task instantly when using multi-thread scheduler. However this seems doable when using current-thread scheduler. I never see more lines after Count hit ... when using current-thread scheduler.

Other complexities in the example are due to tokio::sync::Mutex and task polling order. My current understanding is locking a mutex always yields, and the tokio runtime can schedule any task after the current one yields. This task scheduling is a fairly loose guarantee, which implies why there are many possible outputs.

that is correct, because a single threaded runtime can only poll one task a time, and remember your async fn main() also runs on the same runtime, so once the main task finishes, the runtime will shut down and not poll any more tasks.

for multi-threaded runtime, after the main task finishes, other worker threads might still in the process of polling spawned tasks, and the scheduler will send a signal to each worker thread to notify them the runtime is being shutdown, and wait for them to response.

but the current task being polled on the worker thread needs to run to the next await point before the worker threads can notice the signal that the runtime is being shut down.

1 Like

Maintainer of Tokio here. Tokio does immediately send the cancellation signal to all tasks, but tasks can only be cancelled when they yield at an await. Your tasks are blocking the thread, so they won't be immediately cancelled.

1 Like

I didn't notice that you're using the Tokio mutex. Either way, if the lock calls never have to sleep to get the mutex, then they still won't yield.

I opened a GH issue about tokio mutex before I saw your reply here. I think you mean here is there is no await if the lock can be acquired instantly. That is, the current task does not pass control to another task. On the other hand the official API docs of lock say:

Locks this mutex, causing the current task to yield until the lock has been acquired.

which left me an impression that it always yields.

I have another example for you then:

use std::sync::Arc;

use tokio::{runtime, sync::Mutex};

async fn async_main() {
    let count = Arc::new(Mutex::new(0));

    tokio::spawn(async move {
        println!("unreachable");
    });

    loop {
        let _ = *count.lock().await;
    }
}

fn main() {
    let rt = runtime::Builder::new_current_thread().build().unwrap();
    rt.block_on(async_main());
}

The main task is the only task that tries to acquire the lock. If locks do not yield when they can be acquired instantly, then the main task never yields, and other tasks do not have a chance to run. Note that I'm using current-thread scheduler here. So you would never see output unreachable. However the fact is you can see it.

Okay, I left out some details. The full story is as follows: The Tokio mutex will yield in two cases:

  • Something else holds the lock.
  • Tokio's coop feature kicks in to force it to yield.

The coop feature kicks in once a thread has awaited something without yielding 128 times in a row. You can read about it here: Reducing tail latencies with automatic cooperative task yielding or in the documentation.

If you modify your example to print the iteration counter, then you will see this in action:

use std::sync::Arc;

use tokio::{runtime, sync::Mutex};

async fn async_main() {
    let count = Arc::new(Mutex::new(0));

    tokio::spawn(async move {
        println!("unreachable");
    });

    for i in 0.. {
        println!("{}", i);
        let _ = *count.lock().await;
    }
}

fn main() {
    let rt = runtime::Builder::new_current_thread().build().unwrap();
    rt.block_on(async_main());
}
0
1
2
3
4
5
[...snip...]
125
126
127
128
unreachable
129
130
131
132
10 Likes

That's great demystification. It greatly explains the behavior with current-thread scheduler. The main task does its first 128 acquires then yields. Other tasks run sequentially without yielding because 10 is smaller than 128.

Still there are some interesting cases: When increasing 10 to 200 or 20000, the main task does not come back at 128*5. So the runtime does not schedule tasks in FIFO order even when there is only one worker thread. In theory, we can have two tasks A and B each forced to yield after 128 operations, but the runtime may still cycle among A and B and starve task C. Given that there is no strong guarantee about task scheduling order, the 128-op-per-tick alone does not completely avoid starvation?

1 Like

I'm not sure if it's written down anywhere, but Tokio is extremely careful to avoid such starvation. We allow the runtime to cycle between A and B a finite number of times (this can happen due to the lifo slot or if C ends up in the global queue), but it is guaranteed to eventually choose to poll C over A or B.

1 Like

Thanks for acknowledgement. If there are no-starvation or other guarantees about tasks, I believe a top-level module doc (for example tokio::task) is a good place for them since they are very important.

You're welcome to open a documentation bug about this. It would be nice to see this summarized in one place.

Like I said I already opened a ticket on GH showing where I had gone wrong which could be pitfalls for other learners too, and you already saw the ticket :wink:

I understood that ticket to be about improving the documentation of Mutex. I think it also makes sense to add a section to the documentation about the scheduler's guarantees about starvation.