How to wrap an async Stream?

I'm trying to write a generic wraper for a Stream impl, and the compiler says that the inner Stream has no poll_next().

If 'inner' implements Stream, I thought it would have a .poll_next().
I'm not sure what I'm missing....

rust playground

error[E0599]: no method named `poll_next` found for type parameter `St` in the current scope
  --> src/lib.rs:20:20
   |
20 |         self.inner.poll_next(cx)
   |                    ^^^^^^^^^ method not found in `St`
   |
   = help: items from traits can only be used if the type parameter is bounded by the trait
use std::pin::Pin;
use futures::{
    prelude::*,
    task::{Context, Poll},
};

pub struct FooStream<T, E, St>
where
    St: Stream<Item = Result<T, E>>,
{
    inner: St,
}
impl<T, E, St: Stream<Item = Result<T, E>>> Stream for FooStream<T, E, St>
where
    St: Stream<Item = Result<T, E>>,
{
    type Item = Result<T, E>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.inner.poll_next(cx)
    }
}

Stream::poll_next() takes self: Pin<&mut Self>. You need to create Pin<&mut St> from Pin<&mut FooStream<T, E, St>>, which is called the pin projection.

Try use the pin-project or the pin-project-lite crate.

2 Likes

Thanks for the references!
Here's the working example.

use std::pin::Pin;
use futures::{
    prelude::*,
    task::{Context, Poll},
};
use pin_project_lite::pin_project;

pin_project! {
    pub struct FooStream<St>
    where
        St: Stream,
    {
        #[pin]
        inner: St,
    }
}
impl<St: Stream> Stream for FooStream<St>
where
    St: Stream,
{
    type Item = St::Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        this.inner.poll_next(cx)
    }
}
1 Like