Help polling a pinned future inside a stream

My overarching goal is to implement a "timeout stream" in which the stream can take a maximum amount of time to for each value to yield before giving up. I'm looking for something like tokio_timer::TimeoutStream - Rust but for async-std. The unstable async-std timeout() method on StreamExt (tokio_timer::TimeoutStream - Rust) doesn't seem to match that behaviour; it times out after a certain amount of time has passed from the time the stream has initialized, rather than since the last value was emitted.

My secondary goal is to understand how to make multiple futures and streams cooperate with pinning. Here's how I've implemented the stream:


struct TimeoutStream<I: Unpin, S: Stream<Item = I> + Unpin> {
    inner: S,
    timeout: Duration,
    timeout_future: Pin<Box<dyn Future<Output = ()>>>,
}

impl<I: Unpin, S: Stream<Item = I> + Unpin> Stream for TimeoutStream<I, S> {
    type Item = Result<I, TimeoutError>;
    fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
        let inner = &mut self.inner;
        pin_mut!(inner);
        match inner.poll_next(context) {
            Poll::Pending => match self.timeout_future.poll(context) {},
            Poll::Ready(Some(item)) => {
                self.timeout_future = Box::pin(spawn(sleep(self.timeout)));
                Poll::Ready(Some(Ok(item)))
            }
            Poll::Ready(None) => Poll::Ready(None),
        }
    }
}

The timeout error comes on the match self.timeout_future.poll(context) line, and is no method named 'poll' found for struct 'std::pin::Pin<std::boxed::Box<(dyn core::future::future::Future<Output = ()> + 'static)>>' in the current scope. I've managed a wide collection of other compile errors in my experiments. The one that seemed most likely to succeed was when I tried to extract and pin_mut!() timeout_future as I did for inner. That didn't work though because I was taking out two mutable references to self.

I've been struggling with streams and futures for several months now, and every time I think I finally understand them, something else comes up. It seems like variations on this pattern should be quite common, and I'd like to know what it is I keep misunderstanding when I try to implement such things.

Here's my full repo, though it doesn't build on the playground due to async-std not being available: Rust Playground

You can only call poll on an Pin<&mut F> for F some future. In the case of your Pin<Box<dyn Future>> i believe you can do as_mut to reborrow it as an Pin<&dyn mut Future>.

The pinning macros are meant for async functions, not inside a poll impl.

As for the stream, you can just do Pin::new(&mut the_stream) as it is Unpin.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.