Implement `Stream` from an `AsyncRead`

I am trying to write the equivalent of a Source in streaming, but I it is not clear to me how to make it work due to how to handle the lifetime of the reader.

use futures::io::AsyncRead;

pub struct PageStream<'a, R: AsyncRead> {
    reader: &'a mut R,
}

impl<'a, R: AsyncRead> Stream for PageStream<'a, R> {
    type Item = Result<Vec<u8>>;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
         // how to call read_and_process and return result?
    }
}

async read_and_process<R: AsyncRead>(reader: &mut R) -> Option<Vec<u8>> {
     // opaque, calls `reader.read_exact().await?;` or something.
}

I suspect that I need some form of state of the future read_and_process, but I am not sure how to pass the reader as &mut to the future without breaking ownership.

Maybe the reader should be passed as owned to PageStream and be clonable, so that it can be passed to the future correctly? I am not sure, though, because I would expect to process one element at the time, in which case I would only expect the reader to be Sendable.

Thanks in advance

Assuming each time you get data from poll_read, you get exactly what you expect, this is one way of doing it:

        use futures::AsyncRead;
        use futures::Stream;
        use std::task::Poll;
        use std::pin::Pin;

        pub struct PageStream<'a, R> {
            reader: &'a mut R,
            buf: Vec<u8>
        }

        impl<'a, R: AsyncRead + Unpin> Stream for PageStream<'a, R> {
            type Item = std::io::Result<Vec<u8>>;

            fn poll_next(
                mut self: std::pin::Pin<&mut Self>,
                cx: &mut std::task::Context<'_>,
            ) -> std::task::Poll<Option<Self::Item>> {
                let Self {
                    reader,
                    buf
                } = &mut *self;

                match futures::ready!(Pin::new(reader).poll_read(cx, &mut buf[..])) {
                    Ok(len) => {
                        // this is not performant. You should use a Codec instead
                        let ret = Vec::from(&buf.as_slice()[..len]);
                        Poll::Ready(Some(Ok(ret)))
                    }

                    Err(err) => Poll::Ready(Some(Err(err)))
                }
            }
        }

You also want to make sure the buf is initialized via vec![0u8; MAX_EXPECTED_READ_LEN]. If you don't know what that max value might be, use a LengthDelimitedCodec instead to work with frames.

You can't poll async functions directly. Nor is this usual behavior. If you absolutely must poll a custom async function, wrap the function in a Pin<Box<dyn Future<Output=T>>> then poll that inside poll_next (this means you'll need to store the wrapped future inside the PageStream struct instead and poll that)

The easy answer is to use the async-stream crate, which easily lets you do this.

To answer directly the question of how you can use an async fn, you can do it like this:

use futures::future::BoxFuture;
use futures::io::AsyncRead;
use futures::stream::Stream;
use std::io::Result;
use std::pin::Pin;
use std::task::{Context, Poll};

pub struct PageStream<'a, R> {
    future: Option<BoxFuture<'a, Result<Option<(&'a mut R, Vec<u8>)>>>>,
}

impl<'a, R: AsyncRead + Send + 'a> PageStream<'a, R> {
    pub fn new(reader: &'a mut R) -> Self {
        Self {
            future: Some(Box::pin(read_and_process(reader))),
        }
    }
}

impl<'a, R: AsyncRead + Send + 'a> Stream for PageStream<'a, R> {
    type Item = Result<Vec<u8>>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let me = Pin::into_inner(self);

        match &mut me.future {
            Some(fut) => match fut.as_mut().poll(cx) {
                Poll::Ready(Ok(None)) => {
                    me.future = None;
                    Poll::Ready(None)
                }
                Poll::Ready(Ok(Some((reader, vec)))) => {
                    me.future = Some(Box::pin(read_and_process(reader)));
                    Poll::Ready(Some(Ok(vec)))
                }
                Poll::Ready(Err(err)) => {
                    me.future = None;
                    Poll::Ready(Some(Err(err)))
                }
                Poll::Pending => Poll::Pending,
            },
            None => Poll::Ready(None),
        }
    }
}

async fn read_and_process<R: AsyncRead>(reader: &mut R) -> Result<Option<(&mut R, Vec<u8>)>> {
    todo!()
}
3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.