Can't understand this behaviour of tokio Notify trying to "broadcast" with notify_waiters

I have an application where P producers need to periodically notify C consumers that something happened, where C is usually 1 (but could be more in the future).
Consumer(s) need to do something when they received the notifications from all producers.
I know that I should probably use a semaphore for this, but I tried to use tokio Notify (and .notify_waiters to "broadcast" to all consumers) and I'm getting something I cannot really explain, see following reduced example with 2 producers and 1 consumer:

use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::task::JoinSet;
use tokio::{sync::Notify, time};

const NUM_PRODUCERS: u32 = 2;
const NUM_CONSUMERS: u32 = 1;
static NOTIFICATIONS: AtomicUsize = AtomicUsize::new(0usize);

#[derive(Debug)]
struct Producer {
    id: u32,
    period: Duration,
    receivers_to_notify: Arc<Notify>,
    start: Instant,
}

impl Producer {
    fn new(start: Instant, id: u32, period: Duration, receivers_to_notify: Arc<Notify>) -> Self {
        Self {
            id,
            period,
            receivers_to_notify,
            start,
        }
    }

    async fn produce(&mut self) {
        loop {
            time::sleep(self.period).await;
            let elapsed = self.start.elapsed().as_secs_f64();

            println!("\t\t\t\t\tproducer {} notifying @{elapsed}", self.id);
            self.receivers_to_notify.notify_waiters();
            let n = NOTIFICATIONS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
            if (n != 0) && ((n + 1) as u32 % NUM_PRODUCERS == 0) {
                println!("---------------------------------------------------------\n");
            }
        }
    }
}

#[derive(Debug)]
struct Consumer {
    id: u32,
    notifying_producers: Vec<Arc<Notify>>,
    start: Instant,
}

impl Consumer {
    fn new(start: Instant, id: u32, notifying_producers: Vec<Arc<Notify>>) -> Self {
        Self {
            start,
            id,
            notifying_producers,
        }
    }

    async fn consume(&self) {
        loop {
            let notifiers: Vec<_> = self
                .notifying_producers
                .iter()
                .map(|n| {
                    let notified = n.notified();
                    notified
                })
                .collect();
            futures::future::join_all(notifiers).await;

            let elapsed = self.start.elapsed().as_secs_f64();
            println!(
                "Consumer {} received all @{elapsed}, total notifications={}",
                self.id,
                NOTIFICATIONS.load(std::sync::atomic::Ordering::SeqCst)
            );
        }
    }
}

#[tokio::main]
async fn main() {
    let mut notifying_producers = Vec::new();
    notifying_producers.resize(NUM_PRODUCERS as usize, Arc::new(Notify::new()));

    let mut consumer_tasks = JoinSet::new();

    let start = Instant::now();
    (0..NUM_CONSUMERS).for_each(|id| {
        let consumer = Consumer::new(start, id, notifying_producers.clone());
        consumer_tasks.spawn(async move { consumer.consume().await });
    });

    time::sleep(Duration::from_secs(1)).await;

    let mut producer_tasks = JoinSet::new();
    for (id, notifier) in notifying_producers.iter().enumerate() {
        let mut producer = Producer::new(
            start,
            id as u32,
            Duration::from_millis(3000),
            notifier.clone(),
        );
        producer_tasks.spawn(async move { producer.produce().await });
    }

    while (consumer_tasks.join_next().await).is_some() {}
    while (producer_tasks.join_next().await).is_some() {}
}

I cannot understand what causes an output like the following:

					producer 1 notifying @4.003277477
					producer 0 notifying @4.003359774
---------------------------------------------------------

Consumer 0 received all @4.003398938, total notifications=2 //<-- expected: consumer woke up after receiving 2 notifications
					producer 0 notifying @7.006186216
Consumer 0 received all @7.006263166, total notifications=3 //<-- unexpected: 
                                                            // why consumer woke up? It "subscribed" to all producers' Notifys,
                                                            // but join_all apparently resolved after just one notification
					producer 1 notifying @7.006283288
---------------------------------------------------------

Consumer 0 received all @7.006339447, total notifications=4 // <-- unexpected: again consumer woke up after just one notification sent

As written in the annotations, the first "round" is what I expected, as the consumer's join_all resolved only after all consumers sent their notification, but after that it resolves even with just one notification being sent.
I have probably totally misunderstood how notify_waiters is supposed to work, but I cannot see what I'm missing.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.