I am trying to write the equivalent of a Source in streaming, but I it is not clear to me how to make it work due to how to handle the lifetime of the reader.
use futures::io::AsyncRead;
pub struct PageStream<'a, R: AsyncRead> {
reader: &'a mut R,
}
impl<'a, R: AsyncRead> Stream for PageStream<'a, R> {
type Item = Result<Vec<u8>>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// how to call read_and_process and return result?
}
}
async read_and_process<R: AsyncRead>(reader: &mut R) -> Option<Vec<u8>> {
// opaque, calls `reader.read_exact().await?;` or something.
}
I suspect that I need some form of state of the future read_and_process, but I am not sure how to pass the reader as &mut to the future without breaking ownership.
Maybe the reader should be passed as owned to PageStream and be clonable, so that it can be passed to the future correctly? I am not sure, though, because I would expect to process one element at the time, in which case I would only expect the reader to be Sendable.
Assuming each time you get data from poll_read, you get exactly what you expect, this is one way of doing it:
use futures::AsyncRead;
use futures::Stream;
use std::task::Poll;
use std::pin::Pin;
pub struct PageStream<'a, R> {
reader: &'a mut R,
buf: Vec<u8>
}
impl<'a, R: AsyncRead + Unpin> Stream for PageStream<'a, R> {
type Item = std::io::Result<Vec<u8>>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let Self {
reader,
buf
} = &mut *self;
match futures::ready!(Pin::new(reader).poll_read(cx, &mut buf[..])) {
Ok(len) => {
// this is not performant. You should use a Codec instead
let ret = Vec::from(&buf.as_slice()[..len]);
Poll::Ready(Some(Ok(ret)))
}
Err(err) => Poll::Ready(Some(Err(err)))
}
}
}
You also want to make sure the buf is initialized via vec![0u8; MAX_EXPECTED_READ_LEN]. If you don't know what that max value might be, use a LengthDelimitedCodec instead to work with frames.
You can't poll async functions directly. Nor is this usual behavior. If you absolutely must poll a custom async function, wrap the function in a Pin<Box<dyn Future<Output=T>>> then poll that inside poll_next (this means you'll need to store the wrapped future inside the PageStream struct instead and poll that)