Tokio::task::spawn_blocking is always running in [main] thread

New to rust here.

I am trying to run disk IO related tasks and hence i am using tokio::task::spawn_blocking ; however I see form the output that only the 1st task is running, and that too in main thread. What i want is to spin new thread for each number and print all of them in parallel. Any help?

#[tokio::main]
async fn main() {
    env::set_var("RUST_BACKTRACE", "full");
    let mut handles = vec![];
    inits::init_loggers();
    for ctx in 1..10 {
        debug!("triggering... ns:{}", ctx);
// i need await as downstream is async function
        let handle = tokio::task::spawn_blocking(move ||async move{
            loop {
// disk io operation + API calls, + Async taks here. Putting sleep to represent them
                tokio::time::sleep(Duration::from_secs(4 as u64)).await;
                info!("-- hello -- {}", ctx);
            }
        });
        handles.push(handle);
    }
    let mut handle_futs = vec![];
    for handle in handles {
        let f = handle.await.unwrap();
        handle_futs.push(f);
    }

    for handle_fut in handle_futs {
        handle_fut.await;
    }
    set_rust_ctx_arr_path();
    // run().await;
    info!("--- server stopped ---")
}

Output:

06-29 15:16:08 D [main] [rfs] triggering... ns:1 
06-29 15:16:08 D [main] [rfs] triggering... ns:2 
06-29 15:16:08 D [main] [rfs] triggering... ns:3 
06-29 15:16:08 D [main] [rfs] triggering... ns:4 
06-29 15:16:08 D [main] [rfs] triggering... ns:5 
06-29 15:16:08 D [main] [rfs] triggering... ns:6 
06-29 15:16:08 D [main] [rfs] triggering... ns:7 
06-29 15:16:08 D [main] [rfs] triggering... ns:8 
06-29 15:16:08 D [main] [rfs] triggering... ns:9 

06-29 15:16:12 I [main] [rfs] -- hello -- 1 
06-29 15:16:16 I [main] [rfs] -- hello -- 1 
06-29 15:16:20 I [main] [rfs] -- hello -- 1 
06-29 15:16:24 I [main] [rfs] -- hello -- 1 
06-29 15:16:28 I [main] [rfs] -- hello -- 1 

The code running in the blocking context does nothing but immediately return a future that you then await in the handle_fut.await line.

If you want to run disk IO in a blocking task, you should just do that, removing the extra async move { ... } wrapper.

I want to run the disk ios in parallel threads. I dont want to wait for each blocking call to complete. The blocking happens for 4-5 sec and i have to do several of them. The above code is not running in parallel.

The move, async were written as they represent the actual usecase. the above code is to simulate the issue only

So you say only synchronous calls should be used in sync_blocking ? But my requirement is to have a dedicated thread for every ctx ; If i use tokio::task::spawn I see that only 1 worker thread is being used, even though the tasks are happening in parallel

The code isn't running in parallel due to the reasons explained in the reply:
tokio::spawn_blocking "Runs the provided closure on a thread where blocking is acceptable."

The provided closure in this case is

move || async move {
    loop {
        tokio::time::sleep(Duration::from_secs(4 as u64)).await;
        info!("-- hello -- {}", ctx);
    }
}

When this closure is ran, it immediately returns the value

async move {
    loop {
        tokio::time::sleep(Duration::from_secs(4 as u64)).await;
        info!("-- hello -- {}", ctx);
    }
}

It does not execute the future. That only happens when you await the future, which you do one by one here

for handle in handles {
    let f = handle.await.unwrap();
    handle_futs.push(f);
}

Therefore, the futures you create are executed serially. The solution is in the first reply, you should not return an async move block from the spawn_blocking closure. You should use tokio::task::spawn for cases similar to spawn_blocking but when you want to run async code.

2 Likes

hey thank you for the detailed reply. I now get the point.

I tried with tokio::task::spawn --> But i see only 1 thread being run ( how did i know? the logger prints the thread name and its the same always)

Each sleep is a costly disk io , api calls and awaits on several async function --meaning the closure is async only I dont have choice. Then how do i get them run in several threads --instead of 1 thread when i use tokio::task::spawn

If you need to mix blocking and async operations in one sequence, use block_on() to work with async:

use tokio::task::spawn_blocking;
use tokio::runtime::Handle;

async fn example() {
    let handle = Handle::current();
    spawn_blocking(move || {
        // do something blocking
        
        handle.block_on(async {
            // do something async
        });
        
        // do something blocking
        
        // ...
    });
}
2 Likes

Thank you for the reply. But i think the issue still persists. I see only 1 thread being created and all spawn_blocking-s are using the same thread (-- [tokio-runtime-worker] see the output). This behavior is same as tokio::task::spawn -- except that all run in 1 thread only

new code:

#[tokio::main(flavor = "multi_thread", worker_threads = 200)]
// #[tokio::main]
async fn main() {
  let mut handles = vec![];
    inits::init_loggers();
    for ctx in 1..10 {
        debug!("triggering... ns:{}", ctx);
        // i need await as downstream is async function
        let current = Handle::current();
        let handle = tokio::task::spawn_blocking(move || {
            loop {
//moved 'current' declaration here. Yet the same output
                // disk io operation + API calls, + Async taks here. Putting sleep to represent them
                current.block_on(tokio::time::sleep(Duration::from_secs(4 as u64)));
                info!("-- hello -- {}", ctx);
            }
        });
        handles.push(handle);
    }
    let mut handle_futs = vec![];
    for handle in handles {
        let f = handle.await.unwrap();
        handle_futs.push(f);
    }

    // for handle_fut in handle_futs {
    //     handle_fut.await;
    // }
    // run().await;
    info!("--- server stopped ---")

Output: format mm-dd time Level [thread name] [project name] logger msg

06-29 17:00:14 D [main] [rfs] triggering... ns:1 
06-29 17:00:14 D [main] [rfs] triggering... ns:2 
06-29 17:00:14 D [main] [rfs] triggering... ns:3 
06-29 17:00:14 D [main] [rfs] triggering... ns:4 
06-29 17:00:14 D [main] [rfs] triggering... ns:5 
06-29 17:00:14 D [main] [rfs] triggering... ns:6 
06-29 17:00:14 D [main] [rfs] triggering... ns:7 
06-29 17:00:14 D [main] [rfs] triggering... ns:8 
06-29 17:00:14 D [main] [rfs] triggering... ns:9 
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 2 
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 9 
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 3 
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 4 
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 5 
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 6 
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 7 
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 8 

Looks like you may need to enable logging the thread id as opposed to the thread name. When I run your code with this logger setup:

fn init_loggers() {
    tracing_subscriber::fmt()
        .with_thread_names(true)
        .with_thread_ids(true)
        .init()
}

I see this output:

2023-06-29T18:00:20.165334Z  INFO tokio-runtime-worker ThreadId(207) rplf_tokio_thread: -- hello -- 5
2023-06-29T18:00:20.165378Z  INFO tokio-runtime-worker ThreadId(204) rplf_tokio_thread: -- hello -- 2
2023-06-29T18:00:20.165385Z  INFO tokio-runtime-worker ThreadId(210) rplf_tokio_thread: -- hello -- 8
2023-06-29T18:00:20.165423Z  INFO tokio-runtime-worker ThreadId(208) rplf_tokio_thread: -- hello -- 6
2023-06-29T18:00:20.165447Z  INFO tokio-runtime-worker ThreadId(202) rplf_tokio_thread: -- hello -- 3
2023-06-29T18:00:20.165290Z  INFO tokio-runtime-worker ThreadId(209) rplf_tokio_thread: -- hello -- 7
2023-06-29T18:00:20.165509Z  INFO tokio-runtime-worker ThreadId(206) rplf_tokio_thread: -- hello -- 9
2023-06-29T18:00:20.165449Z  INFO tokio-runtime-worker ThreadId(203) rplf_tokio_thread: -- hello -- 1
2023-06-29T18:00:20.165449Z  INFO tokio-runtime-worker ThreadId(205) rplf_tokio_thread: -- hello -- 4
2023-06-29T18:00:24.167725Z  INFO tokio-runtime-worker ThreadId(204) rplf_tokio_thread: -- hello -- 2
2023-06-29T18:00:24.167769Z  INFO tokio-runtime-worker ThreadId(206) rplf_tokio_thread: -- hello -- 9
2023-06-29T18:00:24.167880Z  INFO tokio-runtime-worker ThreadId(208) rplf_tokio_thread: -- hello -- 6
2023-06-29T18:00:24.167725Z  INFO tokio-runtime-worker ThreadId(207) rplf_tokio_thread: -- hello -- 5
2023-06-29T18:00:24.167728Z  INFO tokio-runtime-worker ThreadId(210) rplf_tokio_thread: -- hello -- 8
2023-06-29T18:00:24.167747Z  INFO tokio-runtime-worker ThreadId(202) rplf_tokio_thread: -- hello -- 3
2023-06-29T18:00:24.167743Z  INFO tokio-runtime-worker ThreadId(205) rplf_tokio_thread: -- hello -- 4
2023-06-29T18:00:24.167743Z  INFO tokio-runtime-worker ThreadId(209) rplf_tokio_thread: -- hello -- 7
2023-06-29T18:00:24.167765Z  INFO tokio-runtime-worker ThreadId(203) rplf_tokio_thread: -- hello -- 1
^C

So we can see they have the same name but different IDs!

Also see here: https://doc.rust-lang.org/stable/std/thread/struct.ThreadId.html

ETA:
Running ps -o thcount <pid> gives:

THCNT
  210
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.