Async, waiting, multithreaded executors, cancel safety and micro-optimizations

tl;dr: How can one simulate Condvar::notify_one() to wake up a single Future without risking that the wake-request gets lost because the Future is being cancelled?


For purposes of illustration, assume we're working on a mpmc channel that supporting cross threads/tasks senders/receivers. The shared structure between the senders and receivers is a VecDeque, behind a Mutex.

For the threaded case, the receiver locks the shared queue, if there any elements on the queue, pop one off and return it. Otherwise, wait on a Condvar (also shared between the senders and receives). When a sender adds an element to the queue, it can choose to either notify one condvar, or all condvars. Let's assume that only one element is added at a time, so we use notify_one(), which wakes up a single waiting receiver that will pick up the new element.

For the async/task case, when a receive is issued a Future is created which in its poll() locks the queue and checks if there are any elements. If there is, pop it and return it using Poll::Ready. But if there aren't, then we need to store our waker somewhere so the sender can trigger it (once an element is added to the queue), and then return Pending. The Waker could be stored in an Option<Waker> in the same buffer that is shared between the senders and receivers.

However, we're supporting multiple receivers, so we need to be able to store multiple Wakers. We can use a Vec instead, so when a sender adds a new element to the queue, it can pop a Waker off the Vec and call wake() on it. Multiple receivers supported, with the ability to wake a single receiver if a single element is added to the queue.

However (again), this doesn't take account for cancellation. If a receiver's Future is waiting in a select!{}, and another Future in the select!{} is resolved first, then the receiver's Future (and its corresponding waker) are dropped. To take this into account, the Future that is used to wait for an element to arrive on the queue has a Drop implementation that removes its Waker from the Vec. .. but how? It needs to be able to identify the Waker to be removed. To solve this we add some means to generate a unique identifier, and use some map to map the identifier to its Waker. We'll use an IndexMap, because it offers a pop() which means a sender can easily wake up a single receiver, mimicking the behavior of Condvar::notify_one().

In a nutshell, this is what we've ended up with:

struct Inner<T> {
  q: VeqDeque<T>,
  rx_wakers: IndexMap<usize, Waker> // For waking up async tasks
}

struct Shared<T> {
  inner: Mutex<Inner<T>>,
  signal: Condvar  // For waking up threads
}

struct RecvFuture {
  sh: Shared<T>,
  id: Option<usize>
}

impl<T> Future for RecvFuture<T> {
  fn poll(..) {
    let inner = self.sh.inner.lock();
    if inner.q.is_empty() {
      // Need to put Future in Pending state
      let id = self.generate_unique_id();
      self.id = Some(id);
      inner.rx_wakers.insert(id, ctx.waker.clone());
      return Poll::Pending;
    }
    // ...
  }
}

impl<T> Drop for RecvFuture<T> {
  fn drop(&mut self) {
    if let Some(id) = self.id {
      // Has returned Pending at some point, so remove our Waker
      let inner = self.sh.inner.lock();
      let _ = inner.rx_wakers.remove(id);
    }
  }
}

impl<T> Sender<T> {
  fn send(&self, e: T) {
    // ...

    // Wake up a single, arbitrary, receiver Future, if there are any waiting
    if let Some((_id, waker)) = inner.rx_wakers.pop() {
      waker.wake();
    }
  }
}

This is all leading up to the question: Given a multi-threaded executor, assume the following state:

  • A RecvFuture in a select!{} has been polled once, returned Poll::Pending, so it has added itself to the rx_wakers
  • This RecvFuture being cancelled (due to another arm in the select!{} resolving)

Is there a time-span between the cancellation and its Futures being dropped, where another thread/task could take the Future's Waker out of rx_wakers and call .wake() on it, but this gets lost because the Future is in the process of getting dropped by the executor?

(I've been hyper-focused on the async case, that I didn't realize that the trivial answer is "yes, definitely, because of non-async threads").

But regardless; is there anything in an executor that will ensure that this isn't a problem (for the async case)? I have a gut feeling that this something a multi-threaded executor can't really solve. I also realize executors have a lot of degrees of freedom, and I'm asking specifically about tokio here.

In writing this I've convinced myself that this is an issue that that needs to be handled. But what solutions are there? One brute-force solution would be for a sender to wake up all wakers whenever a new element is added (and simply let all but one revert to Pending naturally). I imagine there's some magic the Drop implementation one could do to determine if the RecvFuture has been selected to pop an element, but if it ends up in Drop before being able to pop an element it could simply trigger another Waker (if available).

What solutions are there floating around out there? There's gotta be cases where waking up all wakers is too costly, so one would want a more fine-grained solution, but that doesn't run the risk of losing a wake-event.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.