Waking up Tokio Blocking Thread

I have some difficulties in understanding on how to organize communication between Tokio non-blocking task and Tokio blocking thread.

Let's assume I have an ordinary Tokio non-blocking task that needs to perform some long-term computations. Tokio documentation suggesting to spawn a tokio_threadpool::blocking task in this case. The question is how to organize communication between the blocking code and non-blocking code? My idea was to provide an init oneshot channel to notify Blocking code to start execution. And also to send a Sender oneshot through the init that the Blocking code will use once the work is done:

#![feature(async_await)]

use std::error::Error;
use tokio::spawn;
use tokio::prelude::*;
use tokio::future::poll_fn;
use tokio_threadpool::blocking;
use tokio::sync::oneshot::{Sender, Receiver, channel};

#[tokio::main]
async fn main() {
    let (init_tx, mut init_rx) = channel::<(usize, Sender<bool>)>();

    spawn(async move {
        poll_fn(|_| blocking(|| {
            println!("In thread");

            // A point of failure
            let (x, done_tx) = init_rx.try_recv().unwrap();

            println!("X received: {}", x);

            done_tx.send(true);

        })).await;
    });

    // Removing this line makes code work
    std::thread::sleep(std::time::Duration::from_secs(5));

    println!("Before init");

    let (done_tx, done_rx) = channel();

    init_tx.send((100, done_tx)).map_err(|_| panic!("Failed to send"));

    println!("After init");

    done_rx.await;
}

Unfortunately it doesn't work. This is output:

In thread
thread 'tokio-runtime-worker-0' panicked at 'called `Result::unwrap()` on an `Err` value: TryRecvError(())', src/libcore/result.rs:1084:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
Before init
thread 'main' panicked at 'Failed to send', src/main.rs:35:46

The issue here is that the Blocking thread is trying to read init_rx before the non-blocking Task is sending the value. To fix this in an ordinary non-blocking Task I would just use let (x, done_tx) = init_rx.await;, but the blocking function doesn't accept an async code. I would also try to organize polling of the init_rx Receiver manually in the blocking code, but I don't know how to sleep the blocking code properly and to wake it up on init_rx readiness. Also, this method doesn't look robust anyway.

Please note, that I need communication with the Blocking code in async way, because(aside from this simplified example) my Blocking thread is going to do continuous computations and is going to notify non-blocking Tokio tasks on progress(through other channels). I also cannot split Blocking code into several blocks, because the entire work needs to be done in a single Thread(this is a requirement from a 3rd party crate I'm going to use there).

I would like to understand how to achieve this goal with Tokio, preferably within a Tokio idiomatic way? And preferably with Futures 0.3.0.

Thanks in advance!

1 Like

You should only be using the tokio channels if you are using the receiver as a future. What's wrong with using a blocking channel such as std::sync::mpsc or crossbeam::channel.

Of course, messages sent the other way can use the tokio channels.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.