Block until # of running async tasks in group change

I often find it useful to have a primitive for tracking the # of running async tasks in a custom 'task group'. Currently, I am using something like this:

pub struct Async_Counter {
    data: Arc<AtomicU64>,}

impl Async_Counter {
    pub fn cur_val(&self) -> u64 {
        self.data.load(Ordering::Relaxed)}

    pub fn grab(&self) -> Async_Counter_Grab {
        Async_Counter_Grab::new(self.data.clone())}}

pub struct Async_Counter_Grab {
    data: Arc<AtomicU64>,}

impl Async_Counter_Grab {
    pub fn new(data: Arc<AtomicU64>) -> Async_Counter_Grab {
        data.fetch_add(1, Ordering::Relaxed);
        Async { data}}}

impl Drop for Async_Counter_Grab {
    fn drop(&mut self) {
        let x = self.data.fetch_sub(1, Ordering::Relaxed);}}

One thing I am missing form this I would like to add: async block until # of tasks change ?

A custom use would be something like this: I want to have 20 tasks running a given service; sometimes these tasks nay exist with Result<_, Err>. When this happens, I want to restart another async task. This would be something like:

let c = Async_Counter::new();
loop {
  if c.cur_val() < 20 {
    // start a task
  } else {
    // missing part: how do I async block until the counter changes ?
  }
}

Sounds like you need to add some sort of waker to the Async_Counter which gets triggered every time you grab() it or the Async_Counter_Grab is dropped. That would let you give the Async_Counter an async fn wait_for_change(&self) method.

The futures::task::AtomicWaker type might help you implement something like that.

1 Like

How about tokio::sync::Semaphore?

2 Likes

@Michael-F-Bryan : Thanks, AtomicWaker definitely solves the problem as stated; though I could not figure out how to get register to handle multiple waiters if I ever needed to expand the problem.

@Alice: Yeah, looks like I just 'reinvented' the semaphore.