Feedback on Lock-free Event-Bus Library

Hi everyone,

My library is eventador-rs, a lock-free pub/sub event-bus with both sync and async APIs. It's architecturally inspired by the LMAX Disruptor pattern.

I've been working on this project for a little while now. It started as an experiment on whether I could build a lock-free event-bus, and now that I have, I want to perfect it.

Aside from the possible logic bug or race condition that I may have missed, I'm particularly concerned with 3 things:

  1. Calling block_on inside a poll_x function

    As I mentioned in the linked issue, I'm told that calling block_on within a poll_x function is a bad practice. In this case, I'm not sure how to resolve the call to an async function without blocking on it. Advice would be appreciated, as this is not the first time I've run into this issue.

  2. Correct use of memory orderings in these files

    Building your own lock-free data structures is usually discouraged because an inexperienced developer can misuse memory orderings and cause wacky behavior. I'm quite inexperienced in this regard, and would not be surprised if I misused the memory orderings here. However, I'd like to learn where I'm misusing them and why, so that these concepts don't remain a black-box to me.

  3. Benchmarking

    Benchmarking concurrent data structures is quite new to me and I'm not quite sure what a fair test environment would look like, what a fair test even is, etc. Guidance on this matter would be appreciated as well.


You can find a quick summary of the overall architecture of the project here: Architecture.md.

I'm also open to contributions to improve this code, as I'll certainly be able to go over the changes and learn better practices that way as well.

Any advice or criticism would be appreciated! Thanks in advance :slight_smile:

1 Like

I had a look at the block_on call you linked, and by far the easiest way to fix it is to use the async-stream crate, wrapping the calling of self.ring.async_next into a Stream on which you can call poll_next.

I recently described the alternatives to doing stuff like this with async-stream in this post, in particular the lines at the end, which I will post again here:

Of course, an alternate approach is to not use poll based methods at all, instead finding a method where you can just call async methods directly.

1 Like

The async-stream crate made solving Issue #1 a breeze. Thanks, this helped a lot!

You can see my changes here.

Ah.. That's not really how you do it. You are creating a new stream and destroying it on every call. Normally you would have to keep the stream around from poll to poll, since things are typically teared down when you drop it (but that depends on what async_next does).

If what you showed me works, there are even simpler ways.

One way that would break is that if you reach one of the sleep calls inside the method, it will start over entirely from the start of async_next on next poll.

I see what you mean, and that the current implementation could mean that the call to poll_flush or poll_close may be eternally pending.

I'm trying to fix this the way you suggest and keep the stream around between calls to poll, by storing the async-stream as a field in the struct. However, I'm running into issues with type-safety.

  1. I'm not sure what type to store as a field
  2. When should I call pin_mut?
pub struct AsyncPublisher<T> {
    ring: Arc<RingBuffer>,
    buffer_size: usize,
    events: Vec<T>,

    // I'm not sure what type goes here?
    sequence_stream: AsyncStream<u64, impl futures::Future<Output = 64>>, 
}

impl<T: 'static + Send + Sync + Unpin> AsyncPublisher<T> {
    pub(crate) fn new(ring: Arc<RingBuffer>, buffer: usize) -> Self {
        let buffer = if buffer == 0 { buffer + 1 } else { buffer };

        let stream_ring = ring.clone();
        let stream = stream! {
            yield stream_ring.async_next().await;
        };

        // Do I call pin_mut here? Or later, when reading from the stream?
        // pin_mut!(stream);

        Self {
            ring,
            buffer_size: buffer,
            events: Vec::with_capacity(buffer),
            sequence_stream: stream,
        }
    }
}

impl<T: 'static + Send + Sync + Unpin> Sink<T> for AsyncPublisher<T> {
    ...

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        while !self.events.is_empty() {

            // Do I mutably borrow the field here and then call pin_mut?
            // let stream = &mut self.sequence_stream;
            // pin_mut!(stream);

            match stream.poll_next(cx) {

            ...
        }
    }

    ...
}

The error impl Trait not allowed outside of function and inherent method return types is really throwing me off, and I'm not sure how to resolve it.

You should use the following type:

Pin<Box<dyn Stream<Item = u64> + Send + Sync>>

To create the pinned box, call the Box::pin method like this:

let stream = Box::pin(stream! {
    yield stream_ring.async_next().await;
});

As for pin_mut!, that is not needed when you use Box::pin. You can just call

self.sequence_stream.as_mut().poll_next(cx)

in your poll method.

Note: I guessed that u64 is the item type based on your code. If that is not the case, adjust it in the type as appropriate.

That works like a charm. Thanks!

1 Like

You're welcome. I encourage you to open another question for any other poll methods you have trouble removing the block_on from.

1 Like