I asked this question previously, but I don't think I was clear & concise enough. I'm going to ask again, but provide a more concrete example and be less verbose.
My basic question is: Is it possible to implement the equivalent of Condvar::notify_one()
in the async world, using wakers?
The answer is obviously "yes" (because at a high level that's more or less what a Waker::wake()
does), however there are a few assumptions baked into this question:
- A multi-threaded (tokio) runtime is being used.
- The corresponding
Future
s are waiting for a resource that can be waited on from multiple tasks at once. - The
Future
s are assumed to be used in aselect!
, and thus can be cancelled by another arm. - A single
Future
is woken up at a time.
For purposes of illustration, let's assume we're making a MPMC channel Receiver
. The Receiver::recv()
function returns a RecvFut
that implements Future
.
We want each RecvFut
's Waker
to be "addressable", because when a new message is sent by a Sender
, it wants to wake up a single RecvFut
(if there is one). This is done by assigning each RecvFut
a unique usize
. This usize
is added to a shared IndexMap
where the value is the Waker
of the RecvFut
. (The property of being addressable also allows the RecvFut
's Drop
implementation to remove itself from the IndexMap
).
My question more specifically concerns the situation where a Sender
has just placed a new message in the channel's internal queue and is about to wake up a single Receiver
:
Sender::send()
:
let mut inner = self.inner.lock();
inner.queue.push_back(msg);
// Pick a Waker.
// This is meant to approximate Condvar::notify_one().
if let Some((_id, waker)) = inner.receiver_wakers.pop() {
waker.wake();
}
A receiver task is doing something like to this:
loop {
select! {
msg = rx.recv() => {
// Process received message
}
_ = event.wait() => {
// Abort this task
break;
}
}
}
My assumption is that the "wake one" operation is not safe, because the waker.wake()
event can get "lost": Just after a Future
/Waker
has been picked (i.e. the reciver_wakers.pop()
has been called in Sender::send()
), that corresponding Future
can be cancelled due to event.wait()
arm being woken up just before rx.recv()
would have been woken up.
Is my assumption correct; that a Waker::wake()
"event" can get lost, if its intended recipient was "preempted" by another Future
in a select
?
The workaround is simple; just wake up all Futures
and let the ones that are starved return Pending
again. But I'm curious if there are any mitigating factors that make the "wake one" safer than I think it is.