I have an application where P producers need to periodically notify C consumers that something happened, where C is usually 1 (but could be more in the future).
Consumer(s) need to do something when they received the notifications from all producers.
I know that I should probably use a semaphore for this, but I tried to use tokio Notify (and .notify_waiters to "broadcast" to all consumers) and I'm getting something I cannot really explain, see following reduced example with 2 producers and 1 consumer:
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::task::JoinSet;
use tokio::{sync::Notify, time};
const NUM_PRODUCERS: u32 = 2;
const NUM_CONSUMERS: u32 = 1;
static NOTIFICATIONS: AtomicUsize = AtomicUsize::new(0usize);
#[derive(Debug)]
struct Producer {
id: u32,
period: Duration,
receivers_to_notify: Arc<Notify>,
start: Instant,
}
impl Producer {
fn new(start: Instant, id: u32, period: Duration, receivers_to_notify: Arc<Notify>) -> Self {
Self {
id,
period,
receivers_to_notify,
start,
}
}
async fn produce(&mut self) {
loop {
time::sleep(self.period).await;
let elapsed = self.start.elapsed().as_secs_f64();
println!("\t\t\t\t\tproducer {} notifying @{elapsed}", self.id);
self.receivers_to_notify.notify_waiters();
let n = NOTIFICATIONS.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if (n != 0) && ((n + 1) as u32 % NUM_PRODUCERS == 0) {
println!("---------------------------------------------------------\n");
}
}
}
}
#[derive(Debug)]
struct Consumer {
id: u32,
notifying_producers: Vec<Arc<Notify>>,
start: Instant,
}
impl Consumer {
fn new(start: Instant, id: u32, notifying_producers: Vec<Arc<Notify>>) -> Self {
Self {
start,
id,
notifying_producers,
}
}
async fn consume(&self) {
loop {
let notifiers: Vec<_> = self
.notifying_producers
.iter()
.map(|n| {
let notified = n.notified();
notified
})
.collect();
futures::future::join_all(notifiers).await;
let elapsed = self.start.elapsed().as_secs_f64();
println!(
"Consumer {} received all @{elapsed}, total notifications={}",
self.id,
NOTIFICATIONS.load(std::sync::atomic::Ordering::SeqCst)
);
}
}
}
#[tokio::main]
async fn main() {
let mut notifying_producers = Vec::new();
notifying_producers.resize(NUM_PRODUCERS as usize, Arc::new(Notify::new()));
let mut consumer_tasks = JoinSet::new();
let start = Instant::now();
(0..NUM_CONSUMERS).for_each(|id| {
let consumer = Consumer::new(start, id, notifying_producers.clone());
consumer_tasks.spawn(async move { consumer.consume().await });
});
time::sleep(Duration::from_secs(1)).await;
let mut producer_tasks = JoinSet::new();
for (id, notifier) in notifying_producers.iter().enumerate() {
let mut producer = Producer::new(
start,
id as u32,
Duration::from_millis(3000),
notifier.clone(),
);
producer_tasks.spawn(async move { producer.produce().await });
}
while (consumer_tasks.join_next().await).is_some() {}
while (producer_tasks.join_next().await).is_some() {}
}
I cannot understand what causes an output like the following:
producer 1 notifying @4.003277477
producer 0 notifying @4.003359774
---------------------------------------------------------
Consumer 0 received all @4.003398938, total notifications=2 //<-- expected: consumer woke up after receiving 2 notifications
producer 0 notifying @7.006186216
Consumer 0 received all @7.006263166, total notifications=3 //<-- unexpected:
// why consumer woke up? It "subscribed" to all producers' Notifys,
// but join_all apparently resolved after just one notification
producer 1 notifying @7.006283288
---------------------------------------------------------
Consumer 0 received all @7.006339447, total notifications=4 // <-- unexpected: again consumer woke up after just one notification sent
As written in the annotations, the first "round" is what I expected, as the consumer's join_all
resolved only after all consumers sent their notification, but after that it resolves even with just one notification being sent.
I have probably totally misunderstood how notify_waiters
is supposed to work, but I cannot see what I'm missing.