Polling a pending Future with a different Waker

Hi,

Context for my question: I'm writing a Future that will receive a completion notification on another thread (due to OS API constraints). My plan is to pass the Waker to that thread and wake() from there. I'm wondering if this is correct usage of the Waker.

Generally, I understand that an executor is allowed to poll() a pending Future at any time, even if the future didn't signal a Waker. What I'm missing is what must a pending Future do if it receives a different Waker in the poll().

I assume the Future should ideally replace the previous Waker with the current Waker, and use the latest Waker for notification, but is it required?
In other words, do older Wakers remain operational during the entire life of the future?

For example, is this dummy code correct or can it get stuck because the executor may not poll the future due to using an "expired" Waker?

#[derive(Default)]
struct SleepyFuture {
    thread: Option<thread::JoinHandle<()>>,
}

impl Future for SleepyFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let thread = self.thread.get_or_insert_with(|| {
            // Capture only the first ever waker.
            let waker = cx.waker().clone();
            thread::spawn(move || {
                waker.wake_by_ref();
                thread::sleep(Duration::from_secs(1));
                waker.wake_by_ref();
                thread::sleep(Duration::from_secs(1));
                waker.wake_by_ref();
                thread::sleep(Duration::from_secs(1));
                waker.wake_by_ref();
            })
        });

        // Here we ignore the current Waker passed to poll(), relying on the
        // thread to call the original waker.
        match thread.is_finished() {
            false => Poll::Pending,
            true => Poll::Ready(()),
        }
    }
}

(This code doesn't get stuck in Tokio, but I'm concerned it's due to current implementation details.)

Note that on multiple calls to poll, only the Waker from the Context passed to the most recent call should be scheduled to receive a wakeup.

The documentation says "should" not "must". I'm wondering if this is a "must".

If it's a "must" it implies that waking a Waker from another thread must be very carefully coordinated to avoid losing a notification. (Because of cases like processing poll() with a new Waker in parallel to a notification that arrived to the old waker on a different thread.)

To be precise, it’s not necessary that only the most recent Waker is woken — spurious wakeups (of older wakers or any other) are okay. It is, however, necessary that the most recent waker is woken, or the task may get stuck because it will never be polled again.

A pattern to achieve this is:

impl Future for ... {
    fn poll(...) {
        if i_am_ready() {
            return Poll::Ready(...);
        }
        replace_waker(ctx.waker());
        if i_am_ready() {
            Poll::Ready(...)
        } else {
            Poll::Pending
        }
    }
}

This way, if the condition is met while the waker is being replaced, there is no lost wakeup because the condition is checked again before poll() returns. (You can use futures_util::task::AtomicWaker to implement the replaceability in a suitable way. A Mutex<Waker> would work too.)

The general way to avoid having to solve this problem yourself is to use a channel future (such as futures_channel::oneshot) to give you a “sending side” for the thread to own, rather than writing your own communication.

2 Likes