My program subscribes for updates on redis which only has blocking interface.
Currently what i do is basically to create an infinite loop and wait for messages.
I have an async library I need to use and I converted the function that is looping into async but I still must listen to the events on redis and basically that the only thing my program does listens to the events and reacts to them.
Is it okay to use my event listener loop in an async function and call async functions inside if not what else can I do ?.
PS. I am using tokio threaded runtime and my intuition is one of the threads would be blocked for the loop but other threads will still be processing the async jobs started in the main loop thread (I hope)
Tokio offers spawn_blocking
. You can probably put a looping "future" there.
The best approach would probably be a dedicated thread that just does the loop for incoming updates and immediately spawns a future to respond to it when receiving an event, and nothing more.
Putting the contents of each loop iteration into spawn_blocking
is fine. Putting the entire loop inside spawn_blocking
is not ok due to this requirement:
This function is intended for non-async operations that eventually finish on their own. If you want to spawn an ordinary thread, you should use
thread::spawn
instead.(source:
spawn_blocking
doc)
Putting the entire loop into thread::spawn
is fine.
async fn blocking(){
loop{
//listen to the event
task::spawn(otherAyncFn(eventDetails))
}
}
what I want is something like this in between the event ticks my program is literally doing nothing and the main thing is the loop started by the blocking
function above.
need i start it in spawn_blocking anyway ?
@CAD97 @alice
You said your event code was blocking, so I was thinking something like this:
async fn blocking(){
loop{
let event_details = tokio::spawn_blocking(|| {
// listen to event here
}).await.unwrap();
task::spawn(otherAyncFn(eventDetails))
}
}
what is the difference between
//...
let details = redis.get_message(...).unwrap();
task::spawn(otherFunc(defails))
and
//...
let details = task::spawn_blocking(||redis.get_message(...).unwrap()).await.unwrap();
task::spawn(otherFunc(defails))
?
This section should explain it: CPU-bound tasks and blocking code
I read that part but it sounded more like if I want to be able to run more than one cpu intensive thing in parallel to use in my case there is always one loop.
But actually I am not putting the loop in spawn_blocking I guess thats the difference so I can handle multiple get messages at once.
wont it use all the worker threads for the blocking tasks in this case since it is in a loop ?
The .await
makes your loop wait for the spawn_blocking
to finish. So there will only be one redis operation at the time.
Btw I have a single connection I dont think I can move it into spawn_blocking. my workflow is as follows:
loop
createConnection:
loop
wait for event if connection fails go to createConnection
spawn a nonblocking task with event details
so I have a single instance of the connection and I want to spawn new tasks with the data rather than creating new connections and passing them into spawn_blocking calls.
so what I am really asking is is it okay to block one async thread(or whetever it is named) and spawn tasks from the one that is blocking. Will there be any catches or anything that prevents the spawned tasks from being completed.
In js for instance if there is a blocking task running a callback pushed back to the stack from a Promise will never be executed since the only thread executes things is busy.
It is never okay to block an async thread. Tokio uses more than one thread, but it doesn't have very many. If you block all of them, you have the same problem as in Javascript.
I recommend this:
use tokio::runtime::Handle;
// On Tokio's thread. (i.e. inside an async fn)
let handle = Handle::current();
thread::spawn(move || {
loop {
let details = redis.get_message(...).unwrap();
// We can't use tokio::spawn outside of Tokio's threads.
handle.spawn(handle_redis(details));
}
});
Note that you can only use tokio::spawn
from Tokio's threads, so you can't use tokio::spawn
in your manually spawned thread. You can instead get a Handle
to the runtime, which you can move into the new thread and use to spawn tasks on the runtime.
everything in my case except the get_message function are async fucntions so it is guaranteed to block a single thread always
Yeah, it sounds like spawning a thread for the loop is the best solution.
I cant wrap my head around the necessity of spawning another async thread since I already have one created by tokio::main
even if I spawn another async task all the work will be done in there and nowhere else since this blocking operation is the entrypoint of everything
The redis.get_message
call will take a long time, so if you run it on Tokio's worker thread, then no other tasks can run on that thread while get_message
is running. Try running the loop with #[tokio::main(core_threads = 1)]
to see your program deadlock with your loop.
there will be no other task running anyway redis::get_message is the starting point of everything so anyway everything else must wait for it to get data this is a very small service getting its entire data through the message then spawning async tasks to do other things
Your tokio::spawn
would never run if the runtime only has one thread. Removing one of the threads from the runtime and hoping it doesn't run out is a very fragile way of coding.
So should I use channels to communicate between blocking and non blocking parts ? such as
let (tx,rx) = channel();
thread::spawn(move || {loop{
let event = ...
tx.send(event);
}});
let rx = Arc::new(Mutex::new(rx));
task::spawn_blocking(async move ||{
loop {
rx.lock().unwrap().recv();
...
}
})
the broblem with this is that it will be still blocking so instead of one blocking async function now I have 2 neverending blocking function and also I introduce extra complexity of mutex this way
I can't read that. What do you want to happen when Redis gets a message?
If your issue is that the channel recv
function is blocking, then you are using the wrong channel. You want this channel, which has an sync send
and async recv
.
You definitely don't want your channels in an Arc<Mutex<...>>
.