tokio::sync::Notify seems to be broken

I am trying to build a taskpool heavily relying on notifications of tokio::sync.
my code looks like this

use std::sync::{
    atomic::{AtomicUsize, Ordering},
    Arc,
};

use futures::future::BoxFuture;
use tokio::sync::{Mutex, Notify};

struct Pool<T> {
    jobs: Arc<Mutex<Vec<BoxFuture<'static, T>>>>,
    limit: usize,
    num_active_jobs: Arc<AtomicUsize>,
    notifier: Arc<Notify>,
    finish_notifier: Arc<Notify>,
}

impl<T: 'static> Pool<T> {
    pub async fn new(limit: usize) -> Self {
        let mut pool = Self {
            jobs: Default::default(),
            limit,
            num_active_jobs: Arc::new(AtomicUsize::new(0)),
            notifier: Arc::new(Notify::new()),
            finish_notifier: Arc::new(Notify::new()),
        };
        pool.start().await;
        pool
    }
    pub async fn add(&mut self, f: BoxFuture<'static, T>) {
        self.jobs.lock().await.push(f);
        self.num_active_jobs.fetch_add(1, Ordering::Relaxed);
        self.notifier.notify_one();
    }
    async fn start(&mut self) {
        for _ in 0..self.limit {
            let n = self.notifier.clone();
            let jobs = Arc::downgrade(&self.jobs);
            let f = self.finish_notifier.clone();
            tokio::spawn(async move {
                loop {
                    n.notified().await;
                    if let Some(jobs) = jobs.upgrade() {
                        let job = jobs.lock().await.pop();
                        if let Some(job) = job {
                            job.await;
                            println!("down");
                            f.notify_one();
                        };
                    } else {
                        break;
                    }
                }
            });
        }
    }
    pub async fn wait(&self) -> () {
        loop {
            self.finish_notifier.notified().await;
            println!("in");
            if 1 == dbg!(self.num_active_jobs.fetch_sub(1, Ordering::Relaxed)) {
                return ();
            }
        }
    }
    // async fn done(&mut self) -> {
    //     self.num_active_jobs-=1;
    //
    // }
}

#[tokio::main]
async fn main() {
    let mut pool = Pool::<()>::new(1).await;
    for i in 0..10u8 {
        pool.add(Box::pin(async move {
            dbg!(i);
        }))
        .await;
    }
    pool.wait().await;
}

and I get only the first log then it just starts blocking

First of all, why are you building a task pool? Spawning a task in Tokio in very cheap, there's almost no use in conserving them. If you want to limit the number of concurrent operations you can use a Semaphore or buffer_unordered.

I think the problem occurs because the spawned task, after executing the first task, goes back to running n.notified().await. But by this time, all the add calls have finished running and so Notify will no longer be receiving any notifications.

A simple fix is to consume and run as many tasks as possible from the queue, and only when it's empty call n.notified().await. However, that will still be buggy because a task could add a task to the queue and issue a notification in between the worker task observing the queue as empty and sleeping - and then the worker would sleep forever because there would be nothing to wake it up. A proper fix would require some more effort, so it's worth considering whether you need this at all before doing that.

Yes, it never gets past line 41 for the same reason that the code below never exits:

#[tokio::main]
async fn main() {
    let not = Notify::new();
    not.notify_one();
    not.notify_one();
    not.notified().await;
    println!("Hi!");
    not.notified().await;
    println!("This never runs");
}
1 Like

so this is a bug isnt it

No, this is the intended behavior.

1 Like

The thing that remembers how many calls you've made is called a Semaphore.

this is the behaviour in case of broadcast channels as well. It simply starts blocking the consumer once the buffer limit is reached

For example, using a semaphore:

use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let not = Semaphore::new(0);
    not.add_permits(1);
    not.add_permits(1);
    not.acquire().await.unwrap().forget();
    println!("Hi!");
    not.acquire().await.unwrap().forget();
    println!("This will run!");
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.