How async readers read stream?

Rust's futures are super complex and super confusing :(.

Looking at futures::io::ReadExact source code and I wonder, is it always true that AsyncRead will populate the expected buffer in full?

To be more clear, let say I read TcpStream. I expect buffer let mut but = vec![0u8; 6] and the peer is receiving the stream in delayed chunks like this:

123 ___ 4___56___789

Will the result of Pin::new(&mut this.reader).poll_read(cx, &mut buf) always return buf=[] or buf=[1,2,3,4,5,6], or is there another case where the buffer will not be field in full and I should raise an error (e.g. buf=[1,2,3,4,0,0,0])?

poll_read may not return enough bytes to fill the output, but the ReadExact Future will keep calling it until the buffer is filled.

It can't return too many bytes.

Thanks, @alice. I understand what ReadExact should do, but the source code is somehow confusing me. Once the this.buf is filled it is treated as completed.

pub struct ReadExact<'a, R: ?Sized> {
    reader: &'a mut R,
    buf: &'a mut [u8],
}

impl<R: ?Sized + Unpin> Unpin for ReadExact<'_, R> {}

impl<'a, R: AsyncRead + ?Sized + Unpin> ReadExact<'a, R> {
    pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self {
        ReadExact { reader, buf }
    }
}

impl<R: AsyncRead + ?Sized + Unpin> Future for ReadExact<'_, R> {
    type Output = io::Result<()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = &mut *self;
        while !this.buf.is_empty() { // HERE!
            let n = ready!(Pin::new(&mut this.reader).poll_read(cx, this.buf))?;
            {
                let (_, rest) = mem::replace(&mut this.buf, &mut []).split_at_mut(n);
                this.buf = rest;
            }
            if n == 0 {
                return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()))
            }
        }
        Poll::Ready(Ok(()))
    }
}

Basically, the following code

let (_, rest) = mem::replace(&mut this.buf, &mut []).split_at_mut(n);
this.buf = rest;

is equivalent to this:

this.buf = &mut this.buf[n..];

The mem::replace is necessary due to some lifetime limitations.

Ehm ... but the loop is actually called only once? As soon as this.buf is filled with some data it stops, right? How come that this will ensure that the self.buf is fully loaded?

Only if n is equal to the length of the original buffer. Otherwise, this.buf is not empty.

Try playground around with this snippet:


fn read_data(slice: &mut [u8]) -> usize {
    slice[0] = 10;
    slice[1] = 11;
    return 2;
}

fn main() {
    let mut buf = [0; 6];
    
    let mut slice = &mut buf[..];
    
    while !slice.is_empty() {
        let len = read_data(slice);
        slice = &mut slice[len..];
        println!("slice: {:?}", slice);
    }
    
    println!("done: {:?}", buf);
}
slice: [0, 0, 0, 0]
slice: [0, 0]
slice: []
done: [10, 11, 10, 11, 10, 11]

playground

1 Like

I see. Thanks!

Btw, what would be the right way to check if the this.buf has data to be read? So let's say I wanna do TryReadExact where the return is allowed to be 0?

You could do

let mut vec = Vec::new();
(&mut your_io_thing).take(num_bytes)
    .read_to_end(&mut vec);

or just call read in a loop.

But I don't want to consume the content in your_io_thing. Will this consume the stream?

You can't read from an IO stream without consuming the data you read.

1 Like

I see. Thanks, @alice, as always :).

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.