[tokio] Fusing an AsyncReadExt::read() future?

(Using tokio head from the git repo)

I have read half of a tokio TcpStream/TlsStream and I want to use it in a select! block which is in a loop. The problem is after every read the read buffer should be reset, but read() can't do it (because if it does I won't be able to read the contents of the buffer after it returns), and I can't do it either because the fused reader has a mutable reference to the read buffer so I have to drop the it first (which I can't, otherwise I lose the is_terminated info).

Here's the code: https://github.com/osa1/tiny/blob/master/libtiny_client/src/lib.rs#L497

I want to

  • Move the fuse() part out of the loop
  • But still be able to reset read_buf after every successful read.

Anyone know a way to do this?

Someone on the tokio gitter channel suggested fusing the read half first, and then calling read(&mut read_buf) on it, but futures_util::io::ReadHalf doesn't have a FutureExt impl so that doesn't work.

Thanks

You might be able to do something like

let mut read_buf: [u8; 1024] = [0; 1024];
let mut reading = Some(read_half.read(&mut read_buf).fuse());

loop {
    select! {
        ...
        bytes = reading.as_mut().unwrap() => {
            reading.take();
            ...
            reading = Some(read_half.read(&mut read_buf).fuse());
        }
        ...
    }
}

But probably easier would be to use an abstraction like tokio-codec to handle polling the AsyncRead for you and give back a Stream of messages, then all the partial reading would be handled internally, and you can fuse the stream and just select over whether it has a new message ready or not.

(And if you don't want to use tokio-codec for some reason, you can implement the same conversion directly yourself, so that you're not having to try and weave this partially-read state in with your main select loop).

@Nemo157

You might be able to do something like

I don't understand how that can possibly work -- that has the same problem that
I'm trying to fix: it fuses in every iteration (losing is_terminated info),
instead of fusing once and reusing the fuse struct in every iteration.

But probably easier would be to use an abstraction like tokio-codec

I just took a quick look at tokio-codec and it seems like it should solve my
problem (though I haven't confirmed yet). I also had a quick look at the code,
and I don't see any StreamExt or FutureExt implementations in the package so
I'm wondering how it does this. Shouldn't it have the same problem that I'm
having? It'd be appreciated if someone could point me to the right location in
the source code that implements fuse() in tokio-codec.

Tokio-codec might not implement FusedStream itself, but the generic stream fuse adaptor should work with it.

Yeah, I guess the only way to make directly reading work would be to add FusedAsyncRead and make Read delegate it’s FusedFuture impl through that.

Actually, the read loop I show up there can work with one modification:

let mut read_buf: [u8; 1024] = [0; 1024];
let mut reading = Some(read_half.read(&mut read_buf).fuse());

loop {
    select! {
        ...
        bytes = reading.as_mut().unwrap() => {
            if bytes != 0 {
                reading.take();
                ...
                reading = Some(read_half.read(&mut read_buf).fuse());
            }
        }
        ...
    }
}

By only replacing the future when something was read it will retain it's terminated state when reaching EOF.

Interesting idea.. I guess I could also implement a ReadHalf wrapper that stores if the last read() returned 0 bytes, and implement is_terminated using that. Using this I can first fuse the ReadHalf (using my wrapper), and then use the fused struct in select!.

Using FramedReader would probably be easier though. I just need to refactor my parser to use BytesMut instead of Vec<u8>.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.