Is there an easy way to have a channel that sends in blocking mode, receives in async mode?

Suppose I have a blocking serial port library. I want to send its packets to a channel, but receive async in the other side of the channel.

The way that I thought was:

Have a queue structure to accomodate the packets, a condvard to notify when I put a packet there, and this would wake the async waiter

But is there a simpler way? It looks like the futures channels kinda do this but both sides are async

If you're using the tokio runtime, tokio::sync::mpsc::Sender has the blocking send method.

I'm trying to be runtime agnostic as this is a simple task. Can it be done just with the futures crate?

IIRC everything in tokio::sync is runtime agnostic.

Edit: While not stated explicitly in the docs, I found a reference:

That said, some utilities do not need external code: For example a channel can coordinate the two halves and make one half wake the other half up, meaning that no external code is needed to handle that situation. In fact the entire tokio::sync module is executor agnostic for this reason. Additionally the entire futures crate provides exclusively executor agnostic utilities.

Technically yes, it can be done with just the futures crate:

use futures::channel::mpsc;
use futures::executor::block_on;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
fn main() {
    let (mut tx, mut rx) = mpsc::channel(100);
    block_on(tx.send(42));
    println!("received {} from channel", block_on(rx.next()).unwrap())
}

Note the use of block_on. It starts up a light weight single threaded executor that can run any future to completion, blocking the current thread until it's done.

But yes, it is also true that

I still think the first answer is better though. The tokio channel is actually designed to bridge the gap between sync and async worlds. Saves you the trouble of wrapping everything in block_on.

It is:

The mpsc channel does not care about which runtime you use it in, and can be used to send messages from one runtime to another. It can also be used in non-Tokio runtimes.

source

1 Like

I see, I only checked the top-level documentation of the sync module. Might be worth expanding the docs in this regard maybe? Don't know how often questions like this topic or #4232 come up in practise and how to best share what parts of tokio are runtime-dependent and which aren't.

Good idea. Can you file a doc bug about mentioning it in the sync docs?

Flume also has sync + async channels

1 Like

Sure. Corresponding issue.

2 Likes

does that mean I can run an async function that uses rx, and block_on(tx) and it will not mess with the async function runtime, as the block_on runs in a different thread?

Yes, such a channel is not complicated. No need to pull in any dependencies. I assume this example meets your requirements:

use std::collections::VecDeque;
use std::future::{poll_fn, Future};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};

#[derive(Debug)]
pub struct SimpleChannelClosed;

struct SimpleChannel<T> {
    queue: VecDeque<T>,
    waker: Option<Waker>,
    closed: bool,
}

impl<T> SimpleChannel<T> {
    fn new() -> Self {
        SimpleChannel {
            queue: VecDeque::new(),
            waker: None,
            closed: false,
        }
    }

    fn wake(&mut self) {
        if let Some(waker) = self.waker.take() {
            waker.wake();
        }
    }
}

pub struct SimpleSender<T>(Arc<Mutex<SimpleChannel<T>>>);

impl<T> SimpleSender<T> {
    pub fn send(&self, value: T) -> Result<(), SimpleChannelClosed> {
        let mut channel = self.0.lock().unwrap();
        if !channel.closed {
            channel.queue.push_back(value);
            channel.wake();
            Ok(())
        } else {
            Err(SimpleChannelClosed)
        }
    }
}

impl<T> Drop for SimpleSender<T> {
    fn drop(&mut self) {
        let mut channel = self.0.lock().unwrap();
        channel.closed = true;
        channel.wake();
    }
}

pub struct SimpleReceiver<T>(Arc<Mutex<SimpleChannel<T>>>);

impl<T> SimpleReceiver<T> {
    pub fn recv(&mut self) -> impl Future<Output = Result<T, SimpleChannelClosed>> + '_ {
        poll_fn(|cx| {
            let mut channel = self.0.lock().unwrap();

            if let Some(value) = channel.queue.pop_front() {
                Poll::Ready(Ok(value))
            } else if channel.closed {
                Poll::Ready(Err(SimpleChannelClosed))
            } else {
                channel.waker = Some(cx.waker().clone());
                Poll::Pending
            }
        })
    }
}

impl<T> Drop for SimpleReceiver<T> {
    fn drop(&mut self) {
        let mut channel = self.0.lock().unwrap();
        channel.queue.clear();
        channel.closed = true;
    }
}

pub fn simple_channel<T>() -> (SimpleSender<T>, SimpleReceiver<T>) {
    let channel = Arc::new(Mutex::new(SimpleChannel::new()));
    let sender = SimpleSender(channel.clone());
    let receiver = SimpleReceiver(channel);
    (sender, receiver)
}

async fn test_simple_channel() {
    let (tx, mut rx) = simple_channel();

    std::thread::spawn(move || {
        for i in 0..3 {
            tx.send(i).unwrap();
        }
    });

    while let Ok(msg) = rx.recv().await {
        println!("received: {}", msg);
    }
}

The receiving end needs mut to ensure only one receiver is waiting at a time because in this example only one waker can be stored inside the channel.

Please Note: I reworked the code a little bit, forgot to wake on sender drop. Looks like, it's not not so complicated... :wink:

1 Like

Well, block_on blocks the current thread. If your serial code is already running in it's own thread, then it's safe to call block_on. It sounds like it is, but I can't know for sure unless I see your code.

Neat, I hadn't seen flume before. The only other multi-producer, multi-consumer async channel I'd seen before was async_channel. I'll have to give it a more detailed look.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.