I have a Rust app which is basically an axum web server with some routes.
But now I need to periodically check if in a database table there are some new rows.
If there are I need to do some HEAVY computations that can last even minutes (especially in an undersized docker container).
The code I'm using is the below and the output is:
The main thread should not be blocked and should print every second: Instant { t: 6479.5889821s }
The main thread should not be blocked and should print every second: Instant { t: 6480.5996495s }
The main thread should not be blocked and should print every second: Instant { t: 6481.6152853s }
Starting an heavy CPU computation...
The main thread should not be blocked and should print every second: Instant { t: 6502.5748215s }
The main thread should not be blocked and should print every second: Instant { t: 6503.5917731s }
The main thread should not be blocked and should print every second: Instant { t: 6504.5990575s }
As you can see the Starting an heavy CPU computation... blocks the main thread.
Is there a way to avoid this?
Should I use tokio::task::spawn_blocking() for each heavy job?
Can I start the entire "worker" in a separate thread? Because I have many different jobs.
I mean this code in main():
let worker = Queue::new();
tokio::spawn(async move { worker.run().await }); // Is there a way to launch this in a separate thread?
You could create a second runtime dedicated to the job queue I guess. Tokio doesn't allow you control over the threadpool of the multi threaded runtime such that you can dedicate a whole thread just to your job. Also, I wonder if the same footgun applies to for_each_concurrent as it does to FuturesUnordered.
spawn_blocking is made exactly for this usecase. Another solution might be inserting yield_now().await spread across your CPU bound work, making sure to hit one of them at least once per second, though this is easier to mess up and end up with blocking code anyway.
#[tokio::main]
async fn main() {
let worker = Queue::new();
std::thread::spawn(move || {
// What should I call here?
tokio::spawn(async move { worker.run().await });
});
loop {
println!(
"The main thread should not be blocked and should print every second: {:?}",
Instant::now()
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
If I runs this I get this panic:
there is no reactor running, must be called from the context of a Tokio 1.x runtime
use std::thread;
use tokio::runtime;
thread::spawn(move || {
let rt = runtime::Builder::new_current_thread().build().unwrap();
rt.block_on(async move {
// your job worker
});
});
Runs a future to completion on the Tokio runtime. This is the runtime’s entry point.
This runs the given future on the current thread, blocking until it is complete, and yielding its resolved result. Any tasks or timers which the future spawns internally will be executed on the runtime.