My overarching goal is to implement a "timeout stream" in which the stream can take a maximum amount of time to for each value to yield before giving up. I'm looking for something like tokio_timer::TimeoutStream - Rust but for async-std. The unstable async-std timeout()
method on StreamExt
(tokio_timer::TimeoutStream - Rust) doesn't seem to match that behaviour; it times out after a certain amount of time has passed from the time the stream has initialized, rather than since the last value was emitted.
My secondary goal is to understand how to make multiple futures and streams cooperate with pinning. Here's how I've implemented the stream:
struct TimeoutStream<I: Unpin, S: Stream<Item = I> + Unpin> {
inner: S,
timeout: Duration,
timeout_future: Pin<Box<dyn Future<Output = ()>>>,
}
impl<I: Unpin, S: Stream<Item = I> + Unpin> Stream for TimeoutStream<I, S> {
type Item = Result<I, TimeoutError>;
fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
let inner = &mut self.inner;
pin_mut!(inner);
match inner.poll_next(context) {
Poll::Pending => match self.timeout_future.poll(context) {},
Poll::Ready(Some(item)) => {
self.timeout_future = Box::pin(spawn(sleep(self.timeout)));
Poll::Ready(Some(Ok(item)))
}
Poll::Ready(None) => Poll::Ready(None),
}
}
}
The timeout error comes on the match self.timeout_future.poll(context)
line, and is no method named 'poll' found for struct 'std::pin::Pin<std::boxed::Box<(dyn core::future::future::Future<Output = ()> + 'static)>>' in the current scope
. I've managed a wide collection of other compile errors in my experiments. The one that seemed most likely to succeed was when I tried to extract and pin_mut!()
timeout_future
as I did for inner. That didn't work though because I was taking out two mutable references to self.
I've been struggling with streams and futures for several months now, and every time I think I finally understand them, something else comes up. It seems like variations on this pattern should be quite common, and I'd like to know what it is I keep misunderstanding when I try to implement such things.
Here's my full repo, though it doesn't build on the playground due to async-std not being available: Rust Playground