Tokio-streams example

Cannot reproduce the example shown here

use tokio_stream::Stream;

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

#[derive(Debug)]
struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if Instant::now() < self.when {
            Poll::Ready("done")
        } else {
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

impl Stream for Interval {
    type Item = ();

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if self.rem == 0 {
            return Poll::Ready(None);
        }

        match Pin::new(&mut self.delay).poll(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(_) => {
                let when = self.delay.when + Duration::from_millis(1000);
                self.delay = Delay { when };
                self.rem -= 1;
                Poll::Ready(Some(()))
            }
        }
    }
}

struct Interval {
    rem: usize,
    delay: Delay,
}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(1000);
    let delay = Delay { when };

    let mut int = Interval { rem: 10, delay };

    use tokio_stream::StreamExt;

    while let Some(_) = int.next().await {
        println!("sleeping for some time");
    }
}

All the elements are emitted immediately without delay.

I think your comparison is backwards

If when is in the future it will be larger than now on the first poll, so that will evaluate to true immediately

thanks, I missed that

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.