Hello,
My stream impl seems to be blocking and not yielding for other work the way I would like.
I am trying to return Poll::Pending
every once in a while so that my stream will yield so that other work can be done. I "schedule" the waker so that my stream is polled again, but it seems it doesn't yield much at all and just immediately polls again. My assumption is that I may need to wait for some smallish amount of time with something like tokio::time::sleep
or something just to get it to actually yield for long enough for the other work to be done. Is that an accurate assumption based on the impl below, or is there a better way to schedule some yield point on the runtime. I suppose the other consideration is if the runtime is single threaded or using a pool.
Thanks for any clues here!
use futures::stream::Stream;
use log::info;
use std::task::Poll;
use crate::ticks::Tick;
pub struct TickStream<T: Iterator<Item = Tick> + Unpin> {
iter: T,
current_idx: usize,
waker_count: usize,
}
impl<T: Iterator<Item = Tick> + Unpin> TickStream<T> {
pub(crate) fn new(iter: T, waker_count: usize) -> Self {
Self {
iter,
current_idx: 0,
waker_count,
}
}
}
impl<T: Iterator<Item = Tick> + Unpin> Stream for TickStream<T> {
type Item = Tick;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let s = self.get_mut();
if s.current_idx == s.waker_count {
info!("RETURNING PENDING {}", s.current_idx);
s.current_idx = 0;
cx.waker().wake_by_ref();
Poll::Pending
} else {
s.current_idx += 1;
Poll::Ready(s.iter.next())
}
}
}
Cheers!