tl;dr: How can one simulate Condvar::notify_one()
to wake up a single Future
without risking that the wake-request gets lost because the Future
is being cancelled?
For purposes of illustration, assume we're working on a mpmc channel that supporting cross threads/tasks senders/receivers. The shared structure between the senders and receivers is a VecDeque
, behind a Mutex
.
For the threaded case, the receiver locks the shared queue, if there any elements on the queue, pop one off and return it. Otherwise, wait on a Condvar
(also shared between the senders and receives). When a sender adds an element to the queue, it can choose to either notify one condvar, or all condvars. Let's assume that only one element is added at a time, so we use notify_one()
, which wakes up a single waiting receiver that will pick up the new element.
For the async/task case, when a receive is issued a Future
is created which in its poll()
locks the queue and checks if there are any elements. If there is, pop it and return it using Poll::Ready
. But if there aren't, then we need to store our waker somewhere so the sender can trigger it (once an element is added to the queue), and then return Pending
. The Waker
could be stored in an Option<Waker>
in the same buffer that is shared between the senders and receivers.
However, we're supporting multiple receivers, so we need to be able to store multiple Wakers
. We can use a Vec
instead, so when a sender adds a new element to the queue, it can pop a Waker
off the Vec
and call wake()
on it. Multiple receivers supported, with the ability to wake a single receiver if a single element is added to the queue.
However (again), this doesn't take account for cancellation. If a receiver's Future
is waiting in a select!{}
, and another Future
in the select!{}
is resolved first, then the receiver's Future
(and its corresponding waker) are dropped. To take this into account, the Future
that is used to wait for an element to arrive on the queue has a Drop
implementation that removes its Waker
from the Vec
. .. but how? It needs to be able to identify the Waker
to be removed. To solve this we add some means to generate a unique identifier, and use some map to map the identifier to its Waker
. We'll use an IndexMap
, because it offers a pop()
which means a sender can easily wake up a single receiver, mimicking the behavior of Condvar::notify_one()
.
In a nutshell, this is what we've ended up with:
struct Inner<T> {
q: VeqDeque<T>,
rx_wakers: IndexMap<usize, Waker> // For waking up async tasks
}
struct Shared<T> {
inner: Mutex<Inner<T>>,
signal: Condvar // For waking up threads
}
struct RecvFuture {
sh: Shared<T>,
id: Option<usize>
}
impl<T> Future for RecvFuture<T> {
fn poll(..) {
let inner = self.sh.inner.lock();
if inner.q.is_empty() {
// Need to put Future in Pending state
let id = self.generate_unique_id();
self.id = Some(id);
inner.rx_wakers.insert(id, ctx.waker.clone());
return Poll::Pending;
}
// ...
}
}
impl<T> Drop for RecvFuture<T> {
fn drop(&mut self) {
if let Some(id) = self.id {
// Has returned Pending at some point, so remove our Waker
let inner = self.sh.inner.lock();
let _ = inner.rx_wakers.remove(id);
}
}
}
impl<T> Sender<T> {
fn send(&self, e: T) {
// ...
// Wake up a single, arbitrary, receiver Future, if there are any waiting
if let Some((_id, waker)) = inner.rx_wakers.pop() {
waker.wake();
}
}
}
This is all leading up to the question: Given a multi-threaded executor, assume the following state:
- A
RecvFuture
in aselect!{}
has been polled once, returnedPoll::Pending
, so it has added itself to therx_wakers
- This
RecvFuture
being cancelled (due to another arm in theselect!{}
resolving)
Is there a time-span between the cancellation and its Future
s being dropped, where another thread/task could take the Future
's Waker
out of rx_wakers
and call .wake()
on it, but this gets lost because the Future
is in the process of getting dropped by the executor?
(I've been hyper-focused on the async
case, that I didn't realize that the trivial answer is "yes, definitely, because of non-async threads").
But regardless; is there anything in an executor that will ensure that this isn't a problem (for the async case)? I have a gut feeling that this something a multi-threaded executor can't really solve. I also realize executors have a lot of degrees of freedom, and I'm asking specifically about tokio here.
In writing this I've convinced myself that this is an issue that that needs to be handled. But what solutions are there? One brute-force solution would be for a sender to wake up all wakers whenever a new element is added (and simply let all but one revert to Pending
naturally). I imagine there's some magic the Drop
implementation one could do to determine if the RecvFuture
has been selected to pop an element, but if it ends up in Drop
before being able to pop an element it could simply trigger another Waker
(if available).
What solutions are there floating around out there? There's gotta be cases where waking up all wakers is too costly, so one would want a more fine-grained solution, but that doesn't run the risk of losing a wake-event.