Help on Streams: using `wake_by_ref` before Pending to "yield" for other work

Hello,

My stream impl seems to be blocking and not yielding for other work the way I would like.

I am trying to return Poll::Pending every once in a while so that my stream will yield so that other work can be done. I "schedule" the waker so that my stream is polled again, but it seems it doesn't yield much at all and just immediately polls again. My assumption is that I may need to wait for some smallish amount of time with something like tokio::time::sleep or something just to get it to actually yield for long enough for the other work to be done. Is that an accurate assumption based on the impl below, or is there a better way to schedule some yield point on the runtime. I suppose the other consideration is if the runtime is single threaded or using a pool.

Thanks for any clues here!

use futures::stream::Stream;
use log::info;
use std::task::Poll;

use crate::ticks::Tick;

pub struct TickStream<T: Iterator<Item = Tick> + Unpin> {
    iter: T,
    current_idx: usize,
    waker_count: usize,
}

impl<T: Iterator<Item = Tick> + Unpin> TickStream<T> {
    pub(crate) fn new(iter: T, waker_count: usize) -> Self {
        Self {
            iter,
            current_idx: 0,
            waker_count,
        }
    }
}

impl<T: Iterator<Item = Tick> + Unpin> Stream for TickStream<T> {
    type Item = Tick;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let s = self.get_mut();
        if s.current_idx == s.waker_count {
            info!("RETURNING PENDING {}", s.current_idx);
            s.current_idx = 0;
            cx.waker().wake_by_ref();
            Poll::Pending
        } else {
            s.current_idx += 1;
            Poll::Ready(s.iter.next())
        }
    }
}

Cheers!

In most cases, it is not useful to invoke a Waker when you return Poll::Pending. Instead, what a Future (or in this case a Stream) must do is either return Poll::Ready, or return Poll::Pending and invoke the Waker later, when it is able to make progress. This means storing the Waker somewhere, and having some other thread or scheduler wake it — that's how tokio::time::sleep works.

What you've done is the executor-unaware way to yield control and allow other tasks to run. However, there is no guarantee that the code that's calling your future is in fact going to decide to schedule another task, because it doesn't know that you intend that; the behavior you are currently seeing, where your poll_next() is called again immediately, is entirely valid. Take a look at the implementation of tokio::task::yield_now() — it does what you did as a fallback strategy, but it first tries to contact the runtime explicitly.

1 Like

Thanks Kevin! This makes good sense.

Cheers!
Dave

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.