Hello Rustaceans, I'm Filip, and I've been dabbling with Rust for the past moth or so. Here's my first contribution a Lock-free Bounded Non-Blocking Pub-Sub Queue.
The idea behind this queue is to have a Pub Sub pattern, where the size is bounded while the publisher is never blocked. Subscribers will lose data if they can't keep up with the Publisher, but never block it. Implemented is a Bare-bones queue with only broadcast and try_recv method, Sync version with blocking recv(), and blocking recv_timeout, and a Async version with Sink and Stream Traits implemented.
All of the versions use their respective channel functions to create a (Publisher,Subscriber) tuple, while Subscribers implement the Clone Trait to add subscribers.
Comments, rants and suggestions are very welcome.
I would like to start off with this, I don't know much about lock-free programming so I can't comment on that, but I did find a few other things while reading your code which could be clearer or where you could use some handy std lib functionality
1) These clones in
BareSubscriber are unnecessary
2) Here using
Option::unwrap to signal intent can get rid of the match and clear up the code
Unwrap should be used to signal that your code has an invariant that implies that the
Option can only be
Some, which is the case here.
3) (a, b, c) Here you can use
Result::ok to convert to an
Option instead of the match
4) This clone is unecessary
5) You can use
Result::map to do the conversion
- what does
wi mean in
a) similarly what does
ri mean in
I know that this is a bit of a nitpick, but on your examples in your readme and your crate description you made a typo at the bottom:
let publisher = stream::iter_ok(vec![1, 2, 3, 3, 5])
and then you compare it to
vec![1, 2, 3, 4, 5]
Thanks for the detailed analysis. I will defiantly look into it, and fix/refactor the things you mentioned.
wi(write index) and ri(read index) are indexes used inside the circular buffer. The indexes themselves are incremented beyond the size of the queue, but when accessing elements they use % size to never go out of bounds, and basically go around in a circle.
I'm currently working on a more detailed documentation on how the queue works internally. But the basics are a ring-buffer based on a fixed size vector. The publisher writes data into it's index which is calculated by wi % size. While subscribers read data from their index ri calculated as ri % size. It's up to the subscribers to figure out if the publisher has looped around them by checking wi > ri + size, and then updating their index with wi - size which is the oldest written data.
Love nitpicks, please share as many as you find
Yeah, I was testing if the assertion would fail when tested against a wrong vector, and forgot to set it back to the correct vector.
I suggest using clippy to help find small improvements to clarity and performance, it is very nice
Will look into incorporating it, thanks for the advice.
Your suggestions have been implemented in commit "implement code readability", should be merged to master within a day or so.