Async/await and multi-thread Tokio runtime

Hey guys! I'm trying to grasp async/await usage with Tokio runtime.

Take a look at the following code: (Rust Playground)

// Cargo dependency:
// tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread"] }

#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() {
    let handle = tokio::runtime::Handle::current();
    assert_eq!(tokio::runtime::RuntimeFlavor::MultiThread, handle.runtime_flavor());
    println!("Handle: {:?}", handle);
    println!("Main: start {}", ct_id());
    let task1 = handle.spawn(async {
        println!("Task 1: start {}", ct_id());
        let handle = tokio::runtime::Handle::current();
        let task2 = handle.spawn(async {
            println!("Task 2: start {}", ct_id());
            std::thread::sleep(std::time::Duration::from_secs(3));
            println!("Task 2: end {}", ct_id());
        });
        std::thread::sleep(std::time::Duration::from_secs(4));
        println!("Task 1: sleeped {}", ct_id());
        task2.await.unwrap();
        println!("Task 1: end {}", ct_id());
    });
    std::thread::sleep(std::time::Duration::from_secs(5));
    println!("Main: sleeped {}", ct_id());
    task1.await.unwrap();
    println!("Main: end {}", ct_id());
}

fn ct_id() -> String {
    let t = std::thread::current();
    format!("'{}' {:?}", t.name().unwrap_or_default(), t.id())
}

Note that std::thread::sleep is there to represent some long-running blocking operation.

Given that we're running in multi-threaded runtime, I would expect something like this to be printed:

Main: start 'main' ThreadId(1)
Task 1: start 'tokio-runtime-worker' ThreadId(2)
Task 2: start 'tokio-runtime-worker' ThreadId(3)
Task 2: end 'tokio-runtime-worker' ThreadId(3)
Task 1: sleeped 'tokio-runtime-worker' ThreadId(2)
Task 1: end 'tokio-runtime-worker' ThreadId(2)
Main: sleeped 'main' ThreadId(1)
Main: end 'main' ThreadId(1)

Instead I'm seeing this:

Main: start 'main' ThreadId(1)
Task 1: start 'tokio-runtime-worker' ThreadId(2)
Task 1: sleeped 'tokio-runtime-worker' ThreadId(2)
Task 2: start 'tokio-runtime-worker' ThreadId(2)
Main: sleeped 'main' ThreadId(1)
Task 2: end 'tokio-runtime-worker' ThreadId(2)
Task 1: end 'tokio-runtime-worker' ThreadId(2)
Main: end 'main' ThreadId(1)

So while I was able to Handle::spawn an asynchronously running task from main, doing so from tokio-runtime-worker does not have the effect I expected - instead, task is executed on the same worker thread despite handle being RuntimeFlavor::MultiThread.

What am I missing here, and how do I really make use of multi-thread flavor? Thanks!

Don't use std::thread::sleep in async fn. sleep is a blocking call. Use tokio::time::sleep instead.

In an async world, tasks need to yield to the scheduler when they can no longer make meaningful progress. CPU-bound tasks that cannot afford to yield frequently need to be run in a separate thread pool to avoid blocking the scheduler.

2 Likes

Tokio doesn't make any guarantees about how it distributes tasks among threads. You still need to use spawn_blocking for blocking tasks, otherwise you'll be suspending the tokio runtime on that thread. Tokio assumes every async task readily awaits.

1 Like

I'm only using std::thread::sleep as an emulation of a heavy blocking task. I'm aware I should use tokio::time::sleep when I really just need to wait for some time.

Interesting. While that workaround does seem to work, I can no longer wait for task2 because spawn_blocking takes a closure and not a future, so I'm not in async context anymore.

Tokio docs for Handle::spawn says:

This spawns the given future onto the runtime's executor, usually a thread pool. The thread pool is then responsible for polling the future until it completes.
The provided future will start running in the background immediately when spawn is called, even if you don't await the returned JoinHandle.

From the description alone, it sounds like it should do what I expect it to do, but it's not what is really happening. What's the point of RuntimeFlavor::MultiThread if it doesn't guarantee idle threads are used for tasks? Is there another way to really make use of all worker threads?

That's still the same problem: don't put blocking tasks in a async block/fn. If you need to do work that could block then use spawn_blocking.

They will be used for tasks, it's just that the internal mechanism that makes this work assumes you don't block inside async blocks/fns, and if that happens (like in your case) then this might not work while the task is blocked.

2 Likes

In your code you would do:

spawn_blocking(std::thread::sleep(std::time::Duration::from_secs(3))).await.unwrap();

You only spawn the blocking part in spawn_blocking. The rest can be spawned with spawn.

1 Like

The point of it is to get more overall throughput, for an application executing many tasks, than a single thread could achieve. That's not necessarily achieved by using a system that always ensures every task will be schedulable promptly even if the already-running task blocks, because inter-thread communication/synchronization also has costs.

3 Likes

The point I am making is do not put heavy blocking tasks into an async runtime's scheduler. Tasks need to be cooperative. See A note on blocking on the Tokio blog.

It sounds like your interpretation is that the task should be scheduled on some idle thread. For various reasons, spawn inserts the task into the local run queue [1]. While the scheduler does implement work stealing, my understanding is that the worker thread still needs to cooperate with other worker threads to allow stealing to occur. Thus, if any worker is blocked none of the others can steal work from it. [2]


Edit: The "worker thread cooperation" appears to be the decision for when to wake idle workers. The current scheduler implementation simply puts workers to sleep when there is no work at all, while the active thread holds the I/O driver. The active thread spawns tasks onto its local run queue but is unable to wake any of the other workers.

One bad task can halt all executor progress forever · Issue #4730 · tokio-rs/tokio (github.com) has a recent-ish conversation about this behavior. The thread provides a few workarounds, but it was closed in favor of rt: Tolerate slow task polls · Issue #6315 · tokio-rs/tokio (github.com).

There is another workaround in rt: add unstable option to disable the LIFO slot by carllerche · Pull Request #4936 · tokio-rs/tokio (github.com) which does exactly what you want [3]. The downsides are that it's an unstable API that will never be stabilized, there is no support for it in the tokio::main macro, and it only resolves one of the potential cases that can block the scheduler. The discussion on tokio's issue #6315 has more details.


  1. There are code paths which may store the task in a global run queue as a fallback, e.g. when the local run queue is full. ↩︎

  2. I may be way off base, here. But this is what I have been able to gather from the tokio code and a number of articles written about it. ↩︎

  3. Tested with the unstable --cfg tokio_unstable and adapted the runtime init from the test. ↩︎

4 Likes

Thanks folks, this behaviour is very counter-intuitive for me. For context, I've made a gRPC server (using Tokio, Tonic and Prost, so most of the surrounding code was auto-generated) and because endpoints themself were blocking, it essentially degraded to single-thread processing - and I still don't think this is an expected result, but if Tokio architects designed it to work this way, I guess I'll just have to live with it. Oh well.

Thanks again for help!

Fundamentally, trying to put blocking stuff on an async runtime will always lead to problems, no matter what you do. We do have a desire to make these things have less bad consequences, but it's not a simple matter, and doing blocking work on Tokio will always be considered incorrect.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.