How can I use the code written on Read/Write/Seek for AsyncRead/AsyncWrite/AsyncSeek streams?

I want to implement a file format reader/writer so that they can be used for both Read/Write/Seek and AsyncRead/AsyncWrite/AsyncSeek streams, how should I do that?
There is also a few of code written on Read/Write/Seek, such as std's formatting macro writeln! and the from_reader/to_writer functions or methods that are customary in the serde ecosystem. How can I use them on AsyncRead/AsyncWrite/AsyncSeek's stream?

futures_util::io::AllowStdIo seems to be the opposite of this case: it converts a sync stream into an async one, instead of the code that accepts the stream.

You cannot easily do this. Passing an AsyncRead to something that needs a Read would involve blocking the thread since the Read::read method has no await.

1 Like

I'm surprised that nobody has written an adapter for io::Read/io::Write and Stream/AsyncRead/Write yet.

It is fiddly, and requires dealing with channels, buffers, and threads, but it's a common need.

There's SyncIoBridge in tokio-util, which allows you to use a tokio::io::AsyncRead as-if it was actually a std::io::Read, but no-one's done the other direction (spawn_blocking probably better than channels, buffers and threads, at least on Tokio).

3 Likes

Conclusion in front: this problem is completely a logical trick in nature. Enabling codes that operates "R+W+S" to operate "AR+AW+AS" streams, and converting "AR+AW+AS" streams to "R+W+S" for codes operates "R+W+S" to operate, are just two approach to a completely same effect. The former is so hard that even if you use all the channels, buffers or threads, you may not be able to implement it very well, even though it is theoretically possible. But the latter is much easier, have a bunch of implementations and used in at least one lib that need to support both "R+W+S" and "AR+AW+AS". Yes, they are just futures_util::io::AllowStdIo, tokio_util::io::SyncIoBridge and so on.
The problem was came up when I'm implementing a file format. I wanted the code works simpler in various environments and avoid dealing with annoying async bounds. After it was implemented on "AR+AW+AS", i began to implement a network protocol. When taking reference from tungstenite, I suddenly remembered the tungstenite implements WebSocket protocol above "R+W" sync sockets, while there are another two additional crates to bring it to async world, tokio-tungstenite and async-tungstenite, which definitely accepts "AR+AW" async sockets. A protocol implementation that needs to compat with both sync and async world, the completely same scenario. And tokio-tungstenite and async-tungstenite are widely used async WebSocket implementations, meaning that there should be no huge problems or footguns that have not yet been discovered doing this. Then how they make it possible? They included a adaptor named AllowStd in the file compat.rs, the name is straight enough. Looking at its interface, converting a "AR+AW+AS" to a "R+W+S", exactly same as futures_util::io::AllowStdIo or tokio_util::io::SyncIoBridge. But if we look inside, there are two waker proxies for read and write, seems very reasonable and runtime irrelevant, while futures_util::io::AllowStdIo use dummy infinite loop which is completely unacceptable in async context, and tokio_util::io::SyncIoBridge includes some runtime stuff which is good but runtime restricted.
So the solution is porting this AllowStd into a separate crate, wrapping it on async file handles or sockets, and put it in readers or writers that implements for "R+W+S" as normal, right?

Tungstenite can do it because it works even if the operations on the Read/Write traits return WouldBlock errors. When those errors happen, tungstenite returns the error back to tokio-tungstenite, who then uses some Tokio-specific mechanism to wait until the socket becomes ready again, and then it calls into tungstenite again. The vast majority of libraries that use the Read/Write traits do not work in this way, and will break if Read/Write returns any errors, including WouldBlock. It's quite restrictive because you have to be able to pause your progress at any point and resume it later. For example, you can't use read_exact or write_all since those methods are incompatible with having to be paused in the middle of the operation.

If you want to support both async and blocking interfaces, then I recommend that you design your library in the following way:

  1. Create one shared crate with all of the actual logic that operates only on byte arrays.
  2. For each context you want it to work in, create a wrapper crate that makes it possible to use the shared crate via the appropriate IO traits. For example, you might want to create one for blocking IO, one for use in Tokio, and perhaps also one for use with async-std and so on.

The shared crate would have methods like these:

  • A method that gives it a byte array saying "I just read some bytes, here they are."
  • A method that asks it to write some bytes to an array, which the wrapper crate will then write to the actual IO resource.
  • Maybe that method can also say "please seek to X position now" by returning some special value.

This kind of design should make it relatively easy to write the wrapper crates. The wrapper crate would just pass byte arrays between the shared crate and the IO resource.

Note that what I described above is almost the same as what tungstenite does under the hood. It just happens to use Read/Write traits instead of passing around byte arrays, but because it has to support WouldBlock errors, it is essentially the same thing.

Does this make sense?

So did you meaning tungstenite uses "R+W" as the shared interface? What the difference with implementing reader/writer for "R+W+S" normally and use a wrapper? Or the difference is in the wrapper? I'm still confused about this.
tokio-tungstenite uses some tokio-specific mechanism to handle WouldBlock errors, then why async-tungstenite can be runtime-irrelevant? By just letting WouldBlocks alone? Even in its tokio and async-std compat codes, I found no relevant code to handle WouldBlocks, there are only some wrapper functions for the respective TcpStream.

Seems there is also no relevant code to handle WouldBlocks in tokio-tungstenite, so the magic happends inside the tokio runtime and tokio-tungstenite can rely on it? What the situation of other runtimes?

Because async-tungstenite is not runtime-irrelevant. It has a tokio file with Tokio-specific things and an async-std file with async-std specific things.

You just missed it.

In tokio-tungstenite, when Tokio returns a Poll::Pending from its TcpStream, its converted to a WouldBlock error in src/compat.rs like this:

Poll::Pending => Err(std::io::Error::from(std::io::ErrorKind::WouldBlock))

Then, that error goes through tungstenite, which then gives it back to tokio-tungstenite in several places in src/lib.rs. Those places can be found by searching for cvt, since it uses the following utility function:

pub(crate) fn cvt<T>(r: Result<T, WsError>) -> Poll<Result<T, WsError>> {
    match r {
        Ok(v) => Poll::Ready(Ok(v)),
        Err(WsError::Io(ref e)) if e.kind() == std::io::ErrorKind::WouldBlock => {
            trace!("WouldBlock");
            Poll::Pending
        }
        Err(e) => Poll::Ready(Err(e)),
    }
}

To be clear, the actual pause/resume handling happens in tungstenite. The tokio-tungstenite and async-tungstenite crates are just wrapper crates.

The difference between implementing "R+W+S" normally and doing what I suggested, or what tungstenite is doing, is that your code must work even if the R+W+S gives you a WouldBlock error.

So that's it, I thought that code using tokio's mechanism must have accessed tokio's runtime code, and that that code just left WouldBlocks alone. Or instead of tokio's mechanism, it's more like tungstenite's internal mechanism. Now I probably understand what WouldBlock means and where the difference is as mentioned before.
One thing I didn't mention earlier is that tokio-tungstenite has another wrapper outside of AllowStd, which is the key to it being able to provide async interfaces outside. Without this, it is not possible to directly convert sync operations to async. So if I want to be compatible with both synchronous and asynchronous contexts, I have to end up with such this wrapper in my code and provide the "AR+AW+AS" interface outside.
In addition, is it possible to implement a generic async-io-compat or something similar that provides an "AR+AW+AS" interface when the internal "R+W+S" code handles the WouldBlock case?

Yes, that should be possible.

I just realized that the tokio_util::codec module provides an excellent example of how you might implement what I described earlier. Basically the way the module works is that you provide an implementation of the Encoder/Decoder traits. Then, the codec module can convert that into an AsyncRead/AsyncWrite for you. The Encoder/Decoder traits are designed such that it would be easy to write a converter from Encoder/Decoder to Read/Write as well.

If you look at the traits, you will notice that they work solely by modifying byte arrays, and that the Encoder/Decoder does not touch any IO traits. (They use the bytes crate for the byte array types.)

See the examples in the module documentation linked above for an explanation of how it works.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.