Best practices for "bridging" async and sync (particularly read/write)?

I have some code that is migrating to async, it's using the tar crate (sync) currently and while https://crates.io/crates/tokio-tar exists it isn't updated for tokio 1.0 and I'm also trying to avoid rewriting everything at once.

This topic is hard to search for but basically are there crates or APIs I might be missing to bridge between e.g. AsyncRead to Read? I came up with this helper ostree-rs-ext/import.rs at 17a991050c5bf371633695b0bbd5cdbb3d717bca · ostreedev/ostree-rs-ext · GitHub
But it's suboptimal in various ways, e.g. the write is blocking. I think what it should be doing is converting one fd from the pipe using an AsyncFd etc.

Then the more I looked at this, the way we're constantly allocating Bytes here also seems wrong; probably want to have a shared BytesMut with a mutex or so (effectively replicating an OS level pipe in process)? But I got uncertain about using e.g. a tokio Mutex inside both async and a sync spawned helper thread.

Anyways I'm sure I'm not the first person to hit this but I haven't found a good way to do a web search for this because all the keywords are too generic.

Seems like something like this would be worthy to have in at least tokio_util::io perhaps?

I haven't done much with the async ecosystem, so I don't know what tooling already exists. If I needed to write this adapter myself, I'd probably spin up a worker thread to handle the sync I/O and use some kind of async-enabled channel to communicate with it.

An alternative to channels would be to use something like ringbuf, which is non-blocking (but also non-async) paired with Notify to wake up the async side when there's new data available.

1 Like

Right, actually an earlier version of this used spawn_blocking which is clearly more correct. And the mpsc channel docs clearly spell out that you can use e.g. send_blocking() from a spawned thread to bridge sync/async - so that works fine, but would involve a lot of allocation shuffling Bytes around both directions.

I've got an adapter that turns sync io::Write into an async Stream. I know it's not what you need, but maybe this will give you an idea how to approach the problem:

https://gitlab.com/crates.rs/crates.rs/-/blob/master/server/src/writer.rs

Basically, an async channel is needed on the async side, and block_on on the sync side.

1 Like

Thanks, that is helpful. I hadn't seen the technique of passing a Handle around like that.

Some random questions:

  • Why the eprintln!() in the code vs propagating errors to the caller and letting them log? Or failing that, using the tracing crate?
  • Why not call Handle::block_on(sent) directly?
  • Why the T generic versus hardcoding Bytes?

eprint is a leftover from my printf-debugging. I should have been calling handle directly (this one is a scar from upgrading all the way from tokio 0.1)

EDIT: We ended up merging code for this into tokio; SyncIoBridge in tokio_util::io - Rust


OK finally wrote this up, it is as simple as:

use std::io::prelude::*;
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncReadExt};

struct ReadBridge {
    reader: Pin<Box<dyn AsyncRead + Send + Unpin + 'static>>,
    rt: tokio::runtime::Handle,
}

impl Read for ReadBridge {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        let mut reader = self.reader.as_mut();
        self.rt.block_on(async { reader.read(buf).await })
    }
}

/// Bridge from AsyncRead to Read.
pub(crate) fn async_read_to_sync<S: AsyncRead + Unpin + Send + 'static>(
    reader: S,
) -> impl Read + Send + Unpin + 'static {
    let rt = tokio::runtime::Handle::current();
    let reader = Box::pin(reader);
    ReadBridge { reader, rt }
}

(I imagine I could avoid the boxing with pin-project or something)

Thanks for giving me the hint to use Handle!

This seems generally simple and useful enough to be in tokio-util or something right?

1 Like

Another option is to just throw this entire thing into a block_on call.

let mut buf = vec![0u8; 8192].into_boxed_slice();
loop {
    let len = reader.read(&mut buf).await?;
    if len == 0 { break; }
    writer.write_all(&buf[..len])?;
}

This would normally not be acceptable in an async function because the write_all call will block the thread. However, when using block_on, the runtime guarantees that there are no other tasks on the same thread, so there is nothing else to block. This approach reduces the number of block_on calls, which while cheap, are not free.

1 Like

I need to bridge from AsyncRead to Read though, whereas you're bridging AsyncRead to Write?

That can be changed by moving the await.

let mut buf = vec![0u8; 8192].into_boxed_slice();
loop {
    let len = reader.read(&mut buf)?;
    if len == 0 { break; }
    writer.write_all(&buf[..len]).await?;
}

(I'm new to async so sorry if I'm missing something here)

And for full context you can see the code I am working on here: Fix async -> sync read bridge by cgwalters · Pull Request #26 · ostreedev/ostree-rs-ext · GitHub

Your code seems to be taking a Read and bridging it to AsyncWrite? Still not what I need here, I want to go AsyncRead -> Read. Here the Read is being passed to a thread inside a spawn_blocking() that's turning it into a tar::Archive and parsing that synchronously.

Fair enough. If you need to pass it to a function that requires you to pass an Read, then you have to do what you did.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.