I am trying to run disk IO related tasks and hence i am using tokio::task::spawn_blocking ; however I see form the output that only the 1st task is running, and that too in main thread. What i want is to spin new thread for each number and print all of them in parallel. Any help?
#[tokio::main]
async fn main() {
env::set_var("RUST_BACKTRACE", "full");
let mut handles = vec![];
inits::init_loggers();
for ctx in 1..10 {
debug!("triggering... ns:{}", ctx);
// i need await as downstream is async function
let handle = tokio::task::spawn_blocking(move ||async move{
loop {
// disk io operation + API calls, + Async taks here. Putting sleep to represent them
tokio::time::sleep(Duration::from_secs(4 as u64)).await;
info!("-- hello -- {}", ctx);
}
});
handles.push(handle);
}
let mut handle_futs = vec![];
for handle in handles {
let f = handle.await.unwrap();
handle_futs.push(f);
}
for handle_fut in handle_futs {
handle_fut.await;
}
set_rust_ctx_arr_path();
// run().await;
info!("--- server stopped ---")
}
Output:
06-29 15:16:08 D [main] [rfs] triggering... ns:1
06-29 15:16:08 D [main] [rfs] triggering... ns:2
06-29 15:16:08 D [main] [rfs] triggering... ns:3
06-29 15:16:08 D [main] [rfs] triggering... ns:4
06-29 15:16:08 D [main] [rfs] triggering... ns:5
06-29 15:16:08 D [main] [rfs] triggering... ns:6
06-29 15:16:08 D [main] [rfs] triggering... ns:7
06-29 15:16:08 D [main] [rfs] triggering... ns:8
06-29 15:16:08 D [main] [rfs] triggering... ns:9
06-29 15:16:12 I [main] [rfs] -- hello -- 1
06-29 15:16:16 I [main] [rfs] -- hello -- 1
06-29 15:16:20 I [main] [rfs] -- hello -- 1
06-29 15:16:24 I [main] [rfs] -- hello -- 1
06-29 15:16:28 I [main] [rfs] -- hello -- 1
I want to run the disk ios in parallel threads. I dont want to wait for each blocking call to complete. The blocking happens for 4-5 sec and i have to do several of them. The above code is not running in parallel.
The move, async were written as they represent the actual usecase. the above code is to simulate the issue only
So you say only synchronous calls should be used in sync_blocking ? But my requirement is to have a dedicated thread for every ctx ; If i use tokio::task::spawn I see that only 1 worker thread is being used, even though the tasks are happening in parallel
The code isn't running in parallel due to the reasons explained in the reply: tokio::spawn_blocking "Runs the provided closure on a thread where blocking is acceptable."
It does not execute the future. That only happens when you await the future, which you do one by one here
for handle in handles {
let f = handle.await.unwrap();
handle_futs.push(f);
}
Therefore, the futures you create are executed serially. The solution is in the first reply, you should not return an async move block from the spawn_blocking closure. You should use tokio::task::spawn for cases similar to spawn_blocking but when you want to run async code.
hey thank you for the detailed reply. I now get the point.
I tried with tokio::task::spawn --> But i see only 1 thread being run ( how did i know? the logger prints the thread name and its the same always)
Each sleep is a costly disk io , api calls and awaits on several async function --meaning the closure is async only I dont have choice. Then how do i get them run in several threads --instead of 1 thread when i use tokio::task::spawn
Thank you for the reply. But i think the issue still persists. I see only 1 thread being created and all spawn_blocking-s are using the same thread (-- [tokio-runtime-worker] see the output). This behavior is same as tokio::task::spawn -- except that all run in 1 thread only
new code:
#[tokio::main(flavor = "multi_thread", worker_threads = 200)]
// #[tokio::main]
async fn main() {
let mut handles = vec![];
inits::init_loggers();
for ctx in 1..10 {
debug!("triggering... ns:{}", ctx);
// i need await as downstream is async function
let current = Handle::current();
let handle = tokio::task::spawn_blocking(move || {
loop {
//moved 'current' declaration here. Yet the same output
// disk io operation + API calls, + Async taks here. Putting sleep to represent them
current.block_on(tokio::time::sleep(Duration::from_secs(4 as u64)));
info!("-- hello -- {}", ctx);
}
});
handles.push(handle);
}
let mut handle_futs = vec![];
for handle in handles {
let f = handle.await.unwrap();
handle_futs.push(f);
}
// for handle_fut in handle_futs {
// handle_fut.await;
// }
// run().await;
info!("--- server stopped ---")
Output: format mm-dd time Level [thread name] [project name] logger msg
06-29 17:00:14 D [main] [rfs] triggering... ns:1
06-29 17:00:14 D [main] [rfs] triggering... ns:2
06-29 17:00:14 D [main] [rfs] triggering... ns:3
06-29 17:00:14 D [main] [rfs] triggering... ns:4
06-29 17:00:14 D [main] [rfs] triggering... ns:5
06-29 17:00:14 D [main] [rfs] triggering... ns:6
06-29 17:00:14 D [main] [rfs] triggering... ns:7
06-29 17:00:14 D [main] [rfs] triggering... ns:8
06-29 17:00:14 D [main] [rfs] triggering... ns:9
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 2
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 9
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 3
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 4
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 5
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 6
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 7
06-29 17:00:18 I [tokio-runtime-worker] [rfs] -- hello -- 8