Lock-free Bounded Non-Blocking Pub-Sub Queue

Hello Rustaceans, I'm Filip, and I've been dabbling with Rust for the past moth or so. Here's my first contribution a Lock-free Bounded Non-Blocking Pub-Sub Queue.

The idea behind this queue is to have a Pub Sub pattern, where the size is bounded while the publisher is never blocked. Subscribers will lose data if they can't keep up with the Publisher, but never block it. Implemented is a Bare-bones queue with only broadcast and try_recv method, Sync version with blocking recv(), and blocking recv_timeout, and a Async version with Sink and Stream Traits implemented.

All of the versions use their respective channel functions to create a (Publisher,Subscriber) tuple, while Subscribers implement the Clone Trait to add subscribers.

Comments, rants and suggestions are very welcome.


I would like to start off with this, I don't know much about lock-free programming so I can't comment on that, but I did find a few other things while reading your code which could be clearer or where you could use some handy std lib functionality

1) These clones in BareSubscriber are unnecessary
2) Here using Option::unwrap to signal intent can get rid of the match and clear up the code
Unwrap should be used to signal that your code has an invariant that implies that the Option can only be Some, which is the case here.
3) (a, b, c) Here you can use Result::ok to convert to an Option instead of the match
4) This clone is unecessary
5) You can use Result::map to do the conversion

self.bare_publisher.broadcast(item).map(|_| AsyncSink::Ready)

Some questions

  1. what does wi mean in BarePublisher and BareSubscriber
    a) similarly what does ri mean in BareSubscriber
1 Like

I know that this is a bit of a nitpick, but on your examples in your readme and your crate description you made a typo at the bottom:

let publisher = stream::iter_ok(vec![1, 2, 3, 3, 5])

and then you compare it to

    rx.map(|x| *x).collect().wait().unwrap(),
    vec![1, 2, 3, 4, 5]
1 Like

Thanks for the detailed analysis. I will defiantly look into it, and fix/refactor the things you mentioned.
wi(write index) and ri(read index) are indexes used inside the circular buffer. The indexes themselves are incremented beyond the size of the queue, but when accessing elements they use % size to never go out of bounds, and basically go around in a circle.

I'm currently working on a more detailed documentation on how the queue works internally. But the basics are a ring-buffer based on a fixed size vector. The publisher writes data into it's index which is calculated by wi % size. While subscribers read data from their index ri calculated as ri % size. It's up to the subscribers to figure out if the publisher has looped around them by checking wi > ri + size, and then updating their index with wi - size which is the oldest written data.

Love nitpicks, please share as many as you find :slight_smile:
Yeah, I was testing if the assertion would fail when tested against a wrong vector, and forgot to set it back to the correct vector.


I suggest using clippy to help find small improvements to clarity and performance, it is very nice

1 Like

Will look into incorporating it, thanks for the advice.

Your suggestions have been implemented in commit "implement code readability", should be merged to master within a day or so.

1 Like