How to use bytes::BytesMut correctly

Hi

I am using BytesMut from bytes v0.5 as below.

pub struct ByteStream<R>(pub R);

impl <R: tokio::io::AsyncRead + Unpin> futures::stream::Stream for ByteStream<R> {

    type Item = Bytes;

    fn poll_next(mut self : Pin<&mut Self> , cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = BytesMut::with_capacity(1024*64 - 255);
        unsafe {
            buf.set_len(buf.capacity());
        }
        match Pin::new(&mut self.0).poll_read(cx, &mut buf) {
            Ready(Ok(n)) => {
                if n == 0 {
                    Ready(None)
                } else {
                    buf.truncate(n);
                    Ready(Some(buf.freeze()))
                }
            },
            Ready(Err(_)) => {
                Ready(None)
            },
            Pending => Pending
        }   
    }   
}

I used unsafe set_len. Is there a way to improve the above code?

Your code has undefined behaviour as you are passing uninitialized memory to poll_read without first passing it to prepare_uninitialized_buffer.

Note that tokio provies AsyncRead::poll_read_buf which does this for you in the case of a BytesMut. If you are interested in the details of how it works, you can read the source here.

This is how I'd do it:

#[::pin_project::pin_project]
pub
struct ByteStream<R : AsyncRead> /* = */ (
    #[pin]
    pub R,
);

impl<R : AsyncRead> Stream for ByteStream<R> {
    type Item = Bytes;

    fn poll_next (
        self: Pin<&'_ mut Self>,
        ctx: &'_ mut Context,
    ) -> Poll<Option<Self::Item>>
    {
        let this = self.project();
        let mut buf = BytesMut::with_capacity(1024 * 64 - 255);
        this.0
            .poll_read_buf(ctx, &mut buf) // calls `buf.advance_mut(n)`
            .map(|it| match it {
                | Ok(n) if n != 0 => {
                    Some(buf.freeze())
                },
                | _ => None,
            })
    }
}

As @alice said, your code is currently considered UB; that's what .poll_read_buf() is for. The irony here is that the current implementation of .poll_read_buf() is actually doing kind of what you were already doing, so the UB is still there. But we can hope it will be fixed in the long run.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.