A poor man async adaptive barrier

Hi everyone,

I found myself needing an adaptive async barrier and I came up with the following code. Given the complexity of reasoning on concurrent code I wanted to check with you:

  1. That it is indeed sound. I think it is if one accepts that panic do not poison the barrier.
  2. Do you have any idea how much of a performance hit I am incurring comparing to an hypothetic "ideal" implementation?

Thanks

The code

use tokio::sync::broadcast::{Sender, error::RecvError, channel};

#[derive(Debug, Clone, Copy)]
enum Empty {}

#[derive(Debug, Clone)]
pub(crate) struct Barrier {
    inner: Sender<Empty>,
}

impl Barrier {
    pub(crate) async fn wait(self) {
        let mut receiver = self.inner.subscribe();
        drop(self.inner);
        match receiver.recv().await {
            Ok(_) => unreachable!(),
            Err(RecvError::Lagged(_)) => unreachable!(),
            Err(RecvError::Closed) => ()
        }
    }

    pub(crate) fn new() -> Self {
        Self {
            inner: channel(1).0
        }
    }
}

Is there some specific use case which doesn't fits well with the tokio::sync::Barrier?

Yes!

In my case I can't know in advance the number of task that will need to be synchronized. At runtime, I'm launching several tasks which will all query and online API and obtain a list of subsequent actions to perform. A pre-processing must have been done for all of those actions before moving on, hence the need for an adaptative barrier.

The only way to use tokio::sync::Barrier would be to:

  1. obtain in each task the list of actions to perform
  2. synchronize and establish the total number of actions to do
  3. create and share a barrier with the right numbers of waiting slots
1 Like

It looks correct to me.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.