I have some difficulties in understanding on how to organize communication between Tokio non-blocking task and Tokio blocking thread.
Let's assume I have an ordinary Tokio non-blocking task that needs to perform some long-term computations. Tokio documentation suggesting to spawn a tokio_threadpool::blocking
task in this case. The question is how to organize communication between the blocking code and non-blocking code? My idea was to provide an init
oneshot channel to notify Blocking code to start execution. And also to send a Sender
oneshot through the init
that the Blocking code will use once the work is done:
#![feature(async_await)]
use std::error::Error;
use tokio::spawn;
use tokio::prelude::*;
use tokio::future::poll_fn;
use tokio_threadpool::blocking;
use tokio::sync::oneshot::{Sender, Receiver, channel};
#[tokio::main]
async fn main() {
let (init_tx, mut init_rx) = channel::<(usize, Sender<bool>)>();
spawn(async move {
poll_fn(|_| blocking(|| {
println!("In thread");
// A point of failure
let (x, done_tx) = init_rx.try_recv().unwrap();
println!("X received: {}", x);
done_tx.send(true);
})).await;
});
// Removing this line makes code work
std::thread::sleep(std::time::Duration::from_secs(5));
println!("Before init");
let (done_tx, done_rx) = channel();
init_tx.send((100, done_tx)).map_err(|_| panic!("Failed to send"));
println!("After init");
done_rx.await;
}
Unfortunately it doesn't work. This is output:
In thread
thread 'tokio-runtime-worker-0' panicked at 'called `Result::unwrap()` on an `Err` value: TryRecvError(())', src/libcore/result.rs:1084:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
Before init
thread 'main' panicked at 'Failed to send', src/main.rs:35:46
The issue here is that the Blocking thread is trying to read init_rx
before the non-blocking Task is sending the value. To fix this in an ordinary non-blocking Task I would just use let (x, done_tx) = init_rx.await;
, but the blocking
function doesn't accept an async code. I would also try to organize polling of the init_rx
Receiver manually in the blocking code, but I don't know how to sleep the blocking code properly and to wake it up on init_rx readiness. Also, this method doesn't look robust anyway.
Please note, that I need communication with the Blocking code in async way, because(aside from this simplified example) my Blocking thread is going to do continuous computations and is going to notify non-blocking Tokio tasks on progress(through other channels). I also cannot split Blocking code into several blocks, because the entire work needs to be done in a single Thread(this is a requirement from a 3rd party crate I'm going to use there).
I would like to understand how to achieve this goal with Tokio, preferably within a Tokio idiomatic way? And preferably with Futures 0.3.0.
Thanks in advance!