Is async "wake one" safe in a multithreaded runtime?

I asked this question previously, but I don't think I was clear & concise enough. I'm going to ask again, but provide a more concrete example and be less verbose.

My basic question is: Is it possible to implement the equivalent of Condvar::notify_one() in the async world, using wakers?

The answer is obviously "yes" (because at a high level that's more or less what a Waker::wake() does), however there are a few assumptions baked into this question:

  • A multi-threaded (tokio) runtime is being used.
  • The corresponding Futures are waiting for a resource that can be waited on from multiple tasks at once.
  • The Futures are assumed to be used in a select!, and thus can be cancelled by another arm.
  • A single Future is woken up at a time.

For purposes of illustration, let's assume we're making a MPMC channel Receiver. The Receiver::recv() function returns a RecvFut that implements Future.

We want each RecvFut's Waker to be "addressable", because when a new message is sent by a Sender, it wants to wake up a single RecvFut (if there is one). This is done by assigning each RecvFut a unique usize. This usize is added to a shared IndexMap where the value is the Waker of the RecvFut. (The property of being addressable also allows the RecvFut's Drop implementation to remove itself from the IndexMap).

My question more specifically concerns the situation where a Sender has just placed a new message in the channel's internal queue and is about to wake up a single Receiver:

Sender::send():

let mut inner = self.inner.lock();
inner.queue.push_back(msg);
// Pick a Waker.
// This is meant to approximate Condvar::notify_one().
if let Some((_id, waker)) = inner.receiver_wakers.pop() {
  waker.wake();
}

A receiver task is doing something like to this:

loop {
  select! {
    msg = rx.recv() => {
      // Process received message
    }
    _ = event.wait() => {
      // Abort this task
      break;
    }
  }
}

My assumption is that the "wake one" operation is not safe, because the waker.wake() event can get "lost": Just after a Future/Waker has been picked (i.e. the reciver_wakers.pop() has been called in Sender::send()), that corresponding Future can be cancelled due to event.wait() arm being woken up just before rx.recv() would have been woken up.

Is my assumption correct; that a Waker::wake() "event" can get lost, if its intended recipient was "preempted" by another Future in a select?

The workaround is simple; just wake up all Futures and let the ones that are starved return Pending again. But I'm curious if there are any mitigating factors that make the "wake one" safer than I think it is.

Yes, that is correct. What you have to do in addition, is, if the RecvFut is dropped before it is polled, retry delivering the wakeup to another receiver.

1 Like

P.S. In general, to be “cancellation safe” you have to avoid thinking of calling wake() as a meaningful “state transition” in itself; whatever the effect you’re looking for is, it hasn’t actually happened until the corresponding poll() has been called.

But once that happens, poll()s have a certain useful “transaction” character to them: each time a Future::poll() (or Stream::poll_next(), etc.) function is executing, that is an opportunity to decide whether the future is going to complete now, or stay pending and possibly get cancelled, but you know it definitely can’t get cancelled right now because both Future::poll() and Drop::drop() are &mut self methods. So, you can correctly do irreversible things in a poll() that you wouldn’t want to do if cancelled, provided that you plan to return Poll::Ready from that particular call. If a caller polls you and then drops your Ready response, that’s their problem, not yours; it’s not a cancellation.

1 Like

The challenge is that this could result in the wake getting forwarded to a waker that was created after the notify_one call.

1 Like