Cannot reproduce the example shown here
use tokio_stream::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
#[derive(Debug)]
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() < self.when {
Poll::Ready("done")
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.rem == 0 {
return Poll::Ready(None);
}
match Pin::new(&mut self.delay).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => {
let when = self.delay.when + Duration::from_millis(1000);
self.delay = Delay { when };
self.rem -= 1;
Poll::Ready(Some(()))
}
}
}
}
struct Interval {
rem: usize,
delay: Delay,
}
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(1000);
let delay = Delay { when };
let mut int = Interval { rem: 10, delay };
use tokio_stream::StreamExt;
while let Some(_) = int.next().await {
println!("sleeping for some time");
}
}
All the elements are emitted immediately without delay.