Tokio/async-std compat, wrapping stream, impl AsyncRead error

I have a question about wrapping a stream and impl AsyncRead. I'm trying to wrap a tokio tcpstream in order to impl Clone. (Long story short, using tokio-util compat, going from tokio -> async-std, and playing with async-h1 which requires that the tcpstream is Clone). So for example:

/// Needed because async-std tcpstream impl Clone, but tokio tcpstream doesn't?
#[derive(Clone)]
struct WrapStream(Arc<Compat<TcpStream>>);

impl futures_io::AsyncRead for WrapStream {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut &*self.0).poll_read(cx, buf)
    }
}

and the error I get:

error[E0599]: no method named `poll_read` found for struct `std::pin::Pin<&mut &tokio_util::compat::Compat<tokio::net::tcp::stream::TcpStream>>` in the current scope
  --> examples/hello_tokio.rs:81:33
   |
81 |         Pin::new(&mut &*self.0).poll_read(cx, buf)
   |                                 ^^^^^^^^^ method not found in `std::pin::Pin<&mut &tokio_util::compat::Compat<tokio::net::tcp::stream::TcpStream>>`

I'm pretty sure that method should exist on Compat, tokio_util::compat::Compat - Rust, but not sure how to get to it. Various permutations to the code, including not pinning, removing references/dereferences, etc., don't help.

I don't think it's just imports, I've tried useing tokio/async-std Async{Read,Write}. I must have a deeper misunderstanding.

I ported this example from working code (async-std's http-service). (which doesn't use the compat layer).

I've also put together a repo with the example here: GitHub - hwchen/example-wrap-stream

Any help is appreciated.

This is because Arc does not allow you to have mutable access to the inner value, which is required to poll it. One option is to wrap it in an std mutex which you lock on every poll.

Why is the clone necessary, though?

I think that makes sense generally, but then how come this works? (well, I assume it works; I should probably try to compile it). http-service/lib.rs at master · http-rs/http-service · GitHub The link has code that's an Arc without Mutex.

The Clone is necessary because async-h1 requires its stream to impl it. It appears to be an implementation detail, perhaps it makes decoding simpler. https://github.com/hwchen/async-h1/blob/minimize-deps/src/server/mod.rs#L60 I assume it's possible without Clone, since tokio doesn't seem to need it.

However, at this moment, it's just proof-of-concept stuff so I'm not too worried about it. I just want to understand why it's not working when I think it should.

This is because async-std implements their AsyncRead/AsyncWrite traits on &TcpStream, i.e. a reference to a tcp stream, which is the one in use here. Tokio does not do this.

The purpose of implementing them on &TcpStream is to allow a read and a write concurrently. Unfortunately it would also allow two writes concurrently, and this would cause havoc inside the mio backend. It's not UB, but will be very unpredictable.

Tokio instead opts to provide an api for splitting a tcp stream into a read and write half, which allows the first desirable scenario, but doesn't allow the second undesirable scenario. This does, however, mean that a &TcpStream cannot be written to/read from directly, and you have to be explicit about which half is the read-half and which is the write-half, which it sounds like async-h1 isn't.

1 Like

Thanks, wrapping in a Mutex did work.

And thanks for your comments on the different TcpStream implementations. Just to make sure I understand: wrapping an async-std TcpStream in an Arc still allows the inner TcpStream to be polled, because AsyncRead/AsyncWrite are implemented on a reference to TcpStream, and Arc is a reference?

It's very interesting to get insight into the design; it's not always easy to suss out just by reading the code. Much appreciated.

Yeah. The &*TcpStream uses the deref trait to turn &Arc<TcpStream> into &TcpStream. So it creates a &mut &TcpStream, i.e. a &mut T where T: AsyncRead, and then it calls poll.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.