How do you turn a try_recv function into async recv?

I have a sync try_recv function that I am trying to turn into an async version. Is this the way to do it?

/// Wait for data or return a `DisconnectedError`
async fn recv(&self) -> Result<Vec<u8>, DisconnectedError> {
    loop {
        if let Some(data) = self.try_recv()? {
            return Ok(data);
        }

        futures_lite::future::yield_now().await;
    }
}

/// If no data is there to be received then returns Ok(None)
fn try_recv() -> Result<Option<Vec<u8>>, DisconnectedError> { ... }

EDIT: I just realized that in try_recv I could just return an empty Vec if nothing is received. As empty vectors don't allocate. But the question is more about making it async. Not sure if the intent of try_recv is clearer like that or like this.

That will work, but it will peg one CPU at 100% usage because the future will be polled again immediately.

To explain why, you can think of an async runtime as a loop that will continually pop elements from a queue of "tasks" so they can be polled, and every time you call .await it will push a task back onto the queue.

let event_queue = EventQueue::new();

while let Some(next_task) = event_queue.sleep_until_next_task_available() {
  next_task.poll(event_queue.context());
}

Normally a future will set things up so it won't be "woken up" again until "something" has happened which means it can make progress (e.g. a value was written to the channel).

A hacky way to avoid the busy loop from consuming all your CPU when you have no other way of signalling readiness is to insert a small sleep (e.g. tokio::time::sleep(Duration::from_millis(1))) instead of calling yield_now(). That said, it's always better to use something async-aware like replacing std::sync::mpsc channels with futures::channel::mpsc or std socket types with tokio's equivalents.

1 Like

In this case, there really was no other way I think as I am dealing with ffi and io stuff. But sleeping makes perfect sense. Thanks!

EDIT: actually the relevant part of the code uses inter-process communication using the ipc-channel crate.

Ah yeah if that's the case it'll be a pain because you'd need to manually create a future that integrates with the FFI code to put itself to sleep until data is available. You might be able to do this by using mio directly, but it also requires access to the file descriptor which is probably encapsulated deep inside the FFI code already.

If the FFI code has a blocking recv() method, another solution could be to call that from a background thread using tokio::task::spawn_blocking(). Technically, you are still doing the recv() in a blocking way, but because it's running on a background thread you won't accidentally lock up Tokio's event loop and "traditional" blocking code uses the OS's scheduler to sleep until data is ready.

Of course, if you don't care about the efficiency hit, the sleep is a fine solution.

Have you seen the ipc-channel's async feature flag? Enabling it gives you access to the IpcReceiver::to_stream() method which gives you a ipc_channel::asynch::IpcStream that implements futures::Stream.

If you import the futures::stream::StreamExt extension trait you'll get access to asynchronous .next() method which is a properly async-aware version of your recv().

Hmm, the async version of ipc-channel might be just the thing for this specific case. But I do this kind of looping elsewhere too so the sleep version is very useful too.

If you can, you should switch to an async channel instead.

You can spawn a thread to run a blocking recv() on, and use a oneshot channel to await it. If you have lots of channels to await like that, instead of using lots of threads, you can use crossbeam's Select.

Spawning threads for each recv() seems like a way too expensive thing to do. Especially as recv() could be called a LOT. I'll look into Select too.. Thanks.

Note that for the Select route you'll need to implement Future trait yourself, so that you can associate Waker with each channel you wait for and wake it when the Select says its ready.

If you control the channel implementation, it's best to switch to async-channel or use tokio's async mpsc instead of reimplementing basically the same thing.