Use `std::sync::mpsc::channel` with `tokio::select!`?

Hi, this is the following question in my previous post: How to use channels inside sync functions inside tokio runtime?, because the context is too long so a new topic is much easier to discuss.

So in a sync function, which runs inside a tokio runtime, I want to send a message to tokio::select! running in main thread with tokio runtime.

I had tried several methods, none of them actually works:

LocalSet:

  1. The block_on API requires the Runtime, but since the sync function is deep inside a runtime, I cannot get the Runtime for it.
  2. The run_util API requires the caller function (i.e. this sync function) also to be async, so I cannot use it.
  3. The spawn_local API will never be scheduled by runtime.

Why use LocalSet? because there's a main data structure is !Sync and !Send, so I cannot simply use tokio::spawn to handle it. Only spawn_local is acceptable.
And even use tokio::runtime::Handle, it almost gives me the same results.

But it seems tokio::select! only accept async functions/expressions, is it possible to receive the message with tokio::select!? Or do I have to use a std::thread::spawn to wait for this message, and re-send it to tokio::select!?

Maybe I could try to wrap the Receiver::try_recv into a future, so it can work with tokio::select! ?

Related discuss:

You can't. There isn't really any way to have try_recv become a future either.

Have you considered using tokio::sync::mpsc instead? That channel can be used from both sync and async.

3 Likes

hi, I finally solved my previous issues, i.e. send messages in pure sync function inside tokio runtime.

Just use tokio::runtime::Handle::spawn_blocking and Sender::blocking_send could work. So this is not necessary now.

1 Like

Since I recognize that you're an expert on this and thought that I had just done this, would you mind explaining to me, what I've overlooked here?

These two lines create an async form of busy waiting, consuming all available CPU cycles to repeatedly check the condition. I would generally consider this not an acceptable Future implementation outside of special circumstances (e.g. if the executor is known to only poll after progress is made elsewhere such that it’s reasonable to check again).

2 Likes

To expand on this a little bit: you tell the executor that if you get Err(TryRecvError::Empty) from try_recv, it should immediately poll you again as soon as possible.

What you're expected to do is to stash the waker returned by cx.waker() somewhere helpful (noting that you can clone it to get an owned waker) instead of immediately calling a wake method on it. Then, when something happens to receiver such that you're going to have work to do, you can call the wake method then, instead of immediately.

In the case as presented in the link, you control when people can call self.sender.send(), and you know (because you're using a Single Consumer channel) that there's only one receiver at any time. You can thus stash the waker next to the receiver in an Option<Waker>, and call its wake method when the send has completed, to wake up the receiver.

There's a gotcha to watch for, per sync_channel's docs - if BUF_SIZE == 0, then the send call won't return until you have a try_recv running, and you'll need to protect against this somehow (e.g. by making the channel BUF_SIZE.min(1) in size, instead of BUF_SIZE.

And you should probably prefer to use a genuine async channel, where someone else cares about all these details, such as Tokio's MPSC channel, instead of a sync channel; this provides a poll_recv method that does the right thing for your implementation already.

1 Like