A flag type that supports waiting asynchronously

I want to make a flag object that controls whether some tasks can be executed. It works like this:

  • The flag has two states: enabled and disabled.
  • There are some worker tasks that will only be executed only if:
    • the current state of the flag is enabled,
    • or wait for the next enabled state.
  • There may be some other tasks that enable or disable the flag as they wish.

So the API of the flag could be:

pub struct Flag { ... }

impl Flag {
    /// Creates a new flag object.
    pub fn new(enabled: bool) -> Self { ... }

    /// Enables the flag.
    pub fn enable(&self) { ... }

    /// Disables the flag.
    pub fn disable(&self) { ... }

    /// Waits the flag to become enabled.
    pub async fn wait_enabled(&self) { ... }
}

Here is my implementation:

use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll};
use tokio::sync::futures::Notified;
use tokio::sync::Notify;

pin_project_lite::pin_project! {
    #[project = WaitEnabledProjection]
    pub struct WaitEnabled<'a> {
        #[pin]
        notified: Option<Notified<'a>>,
    }
}

impl Future for WaitEnabled<'_> {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        match self.project().notified.as_pin_mut() {
            None => Poll::Ready(()),
            Some(notified) => notified.poll(cx),
        }
    }
}

pub struct Flag {
    enabled: AtomicBool,
    notify: Notify,
}

impl Flag {
    /// Creates a new flag object.
    pub fn new(enabled: bool) -> Self {
        Self {
            enabled: AtomicBool::new(enabled),
            notify: Notify::new(),
        }
    }

    /// Enables the flag.
    pub fn enable(&self) {
        self.enabled.store(true, Ordering::Relaxed);
        self.notify.notify_waiters();
    }

    /// Disables the flag.
    pub fn disable(&self) {
        self.enabled.store(false, Ordering::Relaxed);
    }

    /// Waits the flag to become enabled.
    pub fn wait_enabled(&self) -> WaitEnabled {
        if !self.enabled.load(Ordering::Relaxed) {
            let notified = self.notify.notified();

            // This check prevents losing wake up calls.
            if !self.enabled.load(Ordering::Relaxed) {
                return WaitEnabled {
                    notified: Some(notified),
                };
            }
        }

        WaitEnabled { notified: None }
    }

    /// An alternative simpler implementation of `wait_enabled` but outputs bigger `Future` and the output `Future`
    /// cannot be named.
    pub async fn wait_enabled_2(&self) {
        if !self.enabled.load(Ordering::Relaxed) {
            let notified = self.notify.notified();

            // This check prevents losing wake up calls.
            if !self.enabled.load(Ordering::Relaxed) {
                notified.await;
            }
        }
    }
}

Here is a playground link for my implementation: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=501b09d254598b128b80d810ac2f0655.

Here are my questions:

  • Is my implementation correct?
    • I use Relaxed ordering for checking the current state, is it strong enough to ensure correctness?
    • After the Notified future becomes ready, I do not check the state again, which may be a problem if there are spurious wake ups. Does Notified future guarantee the wake up comes from notify_waiters calls?
  • Is there an existing solution to this problem so I don’t have to implement it myself?
  • What is the best way to implement such a flag type?

Maybe tokio::sync::watch can do what you are looking for. Its Sender is Sync and only requires a shared reference for sending (e.g. using send_replace which is my preferred method for sending).

But it's always fun to implement things on your own too. I find anything synchronization related pretty fascinating for some reason.

I feel like (but not overlooking it really) that depends on whether the Notify establishes some synchronization (or even has to establish synchronization, which might be the case). Looking into its source, I see it currently does some SeqCst accesses (which isn't always guaranteeing synchronization in both ways, btw., because stores synchronize-with loads, and not vice versa). Perhaps, if that (likely unneeded ordering) is partly relaxed in future, it might behave differently then?


I just found this possibly relevant comment:

// The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
// happens-before synchronization must happen between this atomic
// operation and a task calling `notified().await`.
let new = set_state(curr, NOTIFIED);
let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);

So maybe you can rely on this synchronization taking place. :thinking: However, this talks about notified().await and not notified().

Note that between your atomic accesses, you have only a notified() and not a notified().await, so that comment on a happens-before synchronization might not apply there:

But I'm not sure really.

Update: Looks like the notified method does a load of that aforementioned compare-exchange:

    pub fn notified(&self) -> Notified<'_> {
        // we load the number of times notify_waiters
        // was called and store that in the future.
        let state = self.state.load(SeqCst);

So I would deduce that the notification happens-before the call of self.notify.notify() synchronizes-with the load of that value[1] during calling notify() here:

This could mean that your Relaxed load is sufficient. But again: totally not sure. :sweat_smile: I feel like it is sufficient that you use Relaxed, because it's the task of Notify to do that synchronization for you.

For definitions of happens-before and sychronize-with you can refer to the (non-official) C++ memory_order reference.


  1. as long as the load reads the particular value that was actually written during the compare-exchange ↩︎

1 Like

If we only rely on the documented behavior of Notify without looking at its implementation, what memory order should I use?

Even if it's not documented explicitly, I feel like it's the task of a "notify" to establish a happens-before relationship, which means using a Relaxed access in your code is okay. But I'm really not sure. Maybe someone else can tell us.


If it didn't establish a happens-before relationship, I think it could happen that a resource isn't ready when you (think you) have received the notification. So that's why I believe it must always synchronize.


Also compare Is it okay to use Mutex<bool>? where I learned that in my case, where I used a Mutex for synchronization, it turned out that an AtomicBool with Relaxed ordering is (likely) okay in my use-case (there, because malloc/free synchronizes).


Or to say it in plain English: It would be weird (and/or wrong?) if waiters being notified doesn't "happen after" you notify_waiters. (I'm still guessing though.)

P.S.: On the other hand, I would not (always?) expect an mpsc channel, for example, to uphold that property.

P.P.S.: Maybe even an mpsc channel must do that for technical reasons (it needs to pass ownership).

How about this one implemented using tokio::sync::watch?

use tokio::sync::watch::{self, Sender};

pub struct Flag {
    sender: Sender<bool>,
}

impl Flag {
    /// Creates a new flag object.
    pub fn new(enabled: bool) -> Self {
        Self {
            sender: watch::channel(enabled).0,
        }
    }

    /// Enables the flag.
    pub fn enable(&self) {
        self.sender.send_if_modified(|value| {
            if *value {
                false
            } else {
                *value = true;

                true
            }
        });
    }

    /// Disables the flag.
    pub fn disable(&self) {
        self.sender.send_if_modified(|value| {
            *value = false;

            false
        });
    }

    /// Waits the flag to become enabled.
    pub async fn wait_enabled(&self) {
        if !*self.sender.borrow() {
            let mut receiver = self.sender.subscribe();

            if !*receiver.borrow() {
                receiver.changed().await.ok();
            }
        }
    }
}

Playground link: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=ecdac1b2db1edfe25cbed0599198c990.

This one has two disadvantages:

  • The boolean value is protected by a lock, so the performance could be affected.
  • There is no way to name the wait_enabled Future, since the Future returned by receiver.changed() cannot be named.

I still prefer the one with AtomicBool, if its correctness can be justified.

1 Like

Oh I see! What you are looking for (and which you implemented) then is a specialized Watch<bool> which exploits the fact that bool has an atomic equivalent.

I understand now how your approach might be superior to using a generic Watch<T> (with T being bool). :+1:

I guess that's an implementation detail, which could be fixed.

Perhaps you like to open issue reports on that and see if someone likes to addess those concerns (both regarding the explicit documentation of synchronization and also the ability to name the Future as a feature request).

Looking into the source and finding SeqCst, I feel like that's another issue which should be resolved, as likely Acquire, Release, and/or AcqRel, respectively, is sufficient. As far as I can tell, there is only a single atomic involved, so it's useless overhead to use SeqCst. Maybe that's another issue worth reporting (but I'm not sure on that one).

I was wondering: Maybe they use SeqCst on purpose to provide synchronization with other notifies/atomics (i.e. for cases where you use more than a single Notify).

I stumbled upon an example of documentation where this has been explicitly documented. Look at once_cell::race#atomic-orderings:

All types in this module use Acquire and Release atomic orderings for all their operations. While this is not strictly necessary for types other than OnceBox, it is useful for users as it allows them to be certain that after get or get_or_init returns on one thread, any side-effects caused by the setter thread prior to them calling set or get_or_init will be made visible to that thread; without it, it’s possible for it to appear as if they haven’t happened yet from the getter thread’s perspective. […]

Maybe such piece of documentation might be nice in Tokio's Notify as well.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.