Assume that we have a BinaryReader
which contains a async method read_next
impl<R: AsyncRead + AsyncSeek + Unpin + Send> BinaryReader<R> {
// ......
async fn read_next(&mut self) -> Result<Option<OwnedEvent>, Error> {
// ......
todo!()
}
}
Then I want to imply trait Stream
for it, in this situation I may use an additional field to store the future which was called and pinned before, so that we can consume and return it when next wake.
So is there a way to directly support Stream without the need for a wrappers?
Then I tried to introduce a wrapper:
#[pin_project::pin_project]
struct BinaryReaderWrapper<R: AsyncRead + AsyncSeek + Unpin + Send> {
#[pin]
inner: BinaryReader<R>,
#[pin]
last_fut: Option<Pin<Box<dyn Future<Output = Result<Option<OwnedEvent>, Error>>>>>
}
impl<R: AsyncRead + AsyncSeek + Unpin + Send> Stream for BinaryReaderWrapper<R> {
type Item = Result<Option<OwnedEvent>, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
// - let's call the lifetime of this reference `'1`
let mut this = self.project();
if this.last_fut.is_none() {
let ret = Box::pin(this.inner.read_next());
*this.last_fut = Some(ret);
// ^^^ lifetime may not live long enough
// ^^^ cast requires that `'1` must outlive `'static`
}
let result = ready!(this.last_fut.as_pin_mut().unwrap().poll(cx));
this.last_fut = None;
result
}
}
But this won't work, the error messages printed by the compiler is marked in the code above.
I can't figure out why lifetime is mismatch here. It seems like BoxFuture need a 'static
, but read_next
called on a mutable reference of inner, which only has a limited lifetime. But why?
When I change
Box::pin(this.inner.read_next())
toBox::pin(async_func_without_a_receiver())
, everything seems ok.
So does anyone know how to write a proper implementation for Stream in this situation?