I have some code that is migrating to async, it's using the tar crate (sync) currently and while https://crates.io/crates/tokio-tar exists it isn't updated for tokio 1.0 and I'm also trying to avoid rewriting everything at once.
Then the more I looked at this, the way we're constantly allocating Bytes here also seems wrong; probably want to have a shared BytesMut with a mutex or so (effectively replicating an OS level pipe in process)? But I got uncertain about using e.g. a tokio Mutex inside both async and a sync spawned helper thread.
Anyways I'm sure I'm not the first person to hit this but I haven't found a good way to do a web search for this because all the keywords are too generic.
Seems like something like this would be worthy to have in at least tokio_util::io perhaps?
I haven't done much with the async ecosystem, so I don't know what tooling already exists. If I needed to write this adapter myself, I'd probably spin up a worker thread to handle the sync I/O and use some kind of async-enabled channel to communicate with it.
An alternative to channels would be to use something like ringbuf, which is non-blocking (but also non-async) paired with Notify to wake up the async side when there's new data available.
Right, actually an earlier version of this used spawn_blocking which is clearly more correct. And the mpsc channel docs clearly spell out that you can use e.g. send_blocking() from a spawned thread to bridge sync/async - so that works fine, but would involve a lot of allocation shuffling Bytes around both directions.
I've got an adapter that turns sync io::Write into an async Stream. I know it's not what you need, but maybe this will give you an idea how to approach the problem:
eprint is a leftover from my printf-debugging. I should have been calling handle directly (this one is a scar from upgrading all the way from tokio 0.1)
Another option is to just throw this entire thing into a block_on call.
let mut buf = vec![0u8; 8192].into_boxed_slice();
loop {
let len = reader.read(&mut buf).await?;
if len == 0 { break; }
writer.write_all(&buf[..len])?;
}
This would normally not be acceptable in an async function because the write_all call will block the thread. However, when using block_on, the runtime guarantees that there are no other tasks on the same thread, so there is nothing else to block. This approach reduces the number of block_on calls, which while cheap, are not free.
let mut buf = vec![0u8; 8192].into_boxed_slice();
loop {
let len = reader.read(&mut buf)?;
if len == 0 { break; }
writer.write_all(&buf[..len]).await?;
}
Your code seems to be taking a Read and bridging it to AsyncWrite? Still not what I need here, I want to go AsyncRead -> Read. Here the Read is being passed to a thread inside a spawn_blocking() that's turning it into a tar::Archive and parsing that synchronously.