Missing notifications with Tokio Notify and purpose of enable()

I'm working with Notify from Tokio and I'm interested in how a waiter can "miss" a notification.

There's this line from the Tokio docs

The Notified future is not guaranteed to receive wakeups from calls to notify_one() if it has not yet been polled. See the documentation for Notified::enable() for more details.

So my understanding is this code should (or at least can) hang forever if "worker: notifying" is printed before "main: awaiting notification" because the Notified future is not polled via the await until after notify_one() is called.
Playground Link

use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notified = notify.notified(); // not yet polled

    let worker_notify = notify.clone();

    tokio::spawn(async move {
        worker_notify.notify_one();
        println!("worker: notifying");
    });

    sleep(Duration::from_millis(1000)).await;
    println!("main: awaiting notification");
    notified.await;
    println!("main: received notification");
}

But it always exits.

Elsewhere in the docs it says

If notify_one() is called before notified().await , then the next call to notified().await will complete immediately, consuming the permit. Any subsequent calls to notified().await will wait for a new permit.

(emphasis mine) which seems to contradict what it says above.

The docs also say

The Notified future is guaranteed to receive wakeups from notify_waiters() as soon as it has been created, even if it has not yet been polled.

My understanding is that you need to either use notify_waiters()
Playground Link

use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notified = notify.notified(); // not yet polled

    let worker_notify = notify.clone();

    tokio::spawn(async move {
        worker_notify.notify_waiters(); // changed this line
        println!("worker: notifying");
    });

    sleep(Duration::from_millis(1000)).await;
    println!("main: awaiting notification");
    notified.await;
    println!("main: received notification");
}

or use enable()
Per the docs

Adds this future to the list of futures that are ready to receive wakeups from calls to notify_one.
Polling the future also adds it to the list, so this method should only be used if you want to add the future to the list before the first call to poll.

Playground link

use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notified = notify.notified(); // not yet polled
    
    // added these two lines
    tokio::pin!(notified);
    notified.as_mut().enable();
    
    let worker_notify = notify.clone();
    
    tokio::spawn(async move {
        worker_notify.notify_one(); 
        println!("worker: notifying");
    });

    sleep(Duration::from_millis(1000)).await;
    println!("main: awaiting notification");
    notified.await;
    println!("main: received notification");
}

The presence of enable() in the API and fact the docs contrast how notify_waiters() works with how notify_one() works would imply a notification can be missed. But the fact I can't reproduce that and the docs say "If notify_one() is called before notified().await , then the next call to notified().await will complete immediately" has me very confused.

If I have the pattern of creating a Notify and passing it to another thread that will notify this thread and I don't want to miss the notification, what is the correct safe way to ensure I won't miss the notification?

2 Likes

Tokio's Notify can store a single permit, and when notify_one() is called without any waiters, it creates a permit if there isn't one already. The permit causes the next waiter to wake up immediately. That's why your first example does not require enable().

notify_waiters() does not create permits, but it requires the future to be created prior to the wakeup, unlike notify_one() which works even if the future is created after spawning (when using permits).

Since only a single permit may be stored, if there are multiple wakeups in flight, enable is required. This is why the mpsc example doesn't use enable, whereas the mpmc example uses it. The mpmc example in the docs of Notify should explain in more details.

In conclusion, you need enable only if you use notify_one() and there are multiple waiters at the same time.

3 Likes

Thank you for the detailed explanation.

notify_waiters() does not create permits, but [notify_waiters()] requires the future to be created prior to the wakeup, unlike notify_one() which works even if the future is created after spawning (when using permits).

Can you explain this a bit more? Based on the rest of your explanation I would have expected notify_one() and notify_waiters() to be swapped in the bolded part of that sentence.

In conclusion, you need enable only if you use notify_one() and there are multiple waiters at the same time.

That makes sense and this hangs consistently:
Playground link

use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();
    let notified_1 = notify_clone.notified(); // not yet polled
    let notified_2 = notify.notified(); // not yet polled

    let worker_notify = notify.clone();

    tokio::spawn(async move {
        worker_notify.notify_one();
        println!("worker: notifying");
    });

    sleep(Duration::from_millis(1000)).await;
    println!("main: awaiting notification_1");
    notified_1.await;
    println!("main: received on notification_1, awaiting on notification_2");
    notified_2.await;
    println!("main: received on notification_2");
}

Let me explain by example. Consider:

use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notified1 = notify.notified(); // not yet polled
    let notified2 = notify.notified(); // not yet polled
    notify.notify_waiters();
    notified1.await; // woken up even though not enabled
    notified2.await; // woken up even though not enabled
}

This example works because notify_waiters can affect futures before they are polled or enabled.

But this does not work:

use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notified1 = notify.notified(); // not yet polled
    let notified2 = notify.notified(); // not yet polled
    notify.notify_one(); // creates permit
    notify.notify_one(); // already has permit, so noop
    notified1.await; // consumes permit and succeeds
    notified2.await; // sleeps forever - there is no permit
}

Because notify_one() can't see futures unless they are polled or enabled.

If you add enable, you get:

use std::pin::pin;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let mut notified1 = pin!(notify.notified()); // not yet polled
    let mut notified2 = pin!(notify.notified()); // not yet polled
    notified1.as_mut().enable();
    notified2.as_mut().enable();
    notify.notify_one(); // wake up notified1 (no permit created)
    notify.notify_one(); // wake up notified2 (no permit created)
    notified1.await; // woken up by notify_one() directly
    notified2.await; // woken up by notify_one() directly
}
3 Likes

Thanks for the reply. I'm not uncertain about the behavior. I feel that your quote below seems to contradict the behavior demonstrated.

Could you explain this quote a bit more? I feel it directly contradicts how you've demonstrated it works. "notify_one() which works even if the future is created after spawning (when using permits)." seems to only be true if there are not multiple waiters.

And "notify_waiters() does not create permits, but it requires the future to be created prior to the wakeup" seems false because in your first example the wakeup notify.notify_waiters() happens before the future is created on the next line notified1.await

That's exactly the point, yes, that's why it is "notify one".

await does not create a future, it polls the future. Creation of the future happens above, in the notify.notified() call.

2 Likes

The way notify_waiters() works is that there is a counter for the number of calls to notify_waiters(), and when you call notified() that stores the current counter. When you then later await the future, it checks if the number of calls has changed. Therefore, the below program will get past future 1 and 2, but not 3.

use std::sync::Arc;
use tokio::sync::Notify;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notified1 = notify.notified(); // num calls = 0
    let notified2 = notify.notified(); // num calls = 0
    notify.notify_waiters(); // num calls = 1
    let notified3 = notify.notified(); // num calls = 1
    notified1.await; // observes that num calls > 0, so exits
    notified2.await; // observes that num calls > 0, so exits
    notified3.await; // num calls is still 1, so sleeps
}
2 Likes