Converting thread-safe lock free queue from C++

Hi all!

I'm new to Rust world and started to learn it only few weeks ago. I came here from C++ and despite of all my experience in programming it was not so easy to start.
To get a feeling what Rust is I decided to rewrite my queue algorithm (GitHub). You can also see some description and discussion (check GitHub page for the link).

In a simple words idea is following. We have a queue that consists of 2 subqueues: one for a writer thread and another for reader one. Writer can freely write to its subqueue and reader can read from its one. And they don't interfere each other. Writer checks each time after adding new value to subqueue if reader have still what to read and if it doesn't then writer gives reader its subqueue and starts empty one for itself.
There is one case when writer finishes writing but reader still have what to read. In such a situation writer sets a special flag that shows that it stops to write and reader checks it when its subqueue is empty and reuses writers subqueue to get remaining data.
This algorithm works for one writer and one reader thread configuration and requires only 1 atomic for the readers subqueue head for synchronization.
For the multiple writers and readers there is Guard wrapper that protects writers and readers by using mutex locks. In this case we lock only writer's group or only reader's one but those groups don't block each other.
I tested my algorithm with a simple test and with Relacy Race Detector and looks like it works correctly.

Now I'm trying to rewrite it in Rust (GitHub).
I was using LinkedList and mpsc::Queue structures from the standard Rust library as examples to how to write my queue but stuck with understanding how to use AtomicPtr.

Can someone help me with fixing my implementation?
I'm trying to avoid unsafe blocks as much as possible but not sure if I can write code completely without it.
Also I have a feeling that my approach is not the best one so I will appreciate if someone have a better idea.

Thanx in advance.

1 Like

Wow, writing lock-free code is tough, especially in a non garbage collected language. It may not be the best exercise to learn rust, have you considered writing something concurrent, but not lock free first?

To learn, how lock free can be done in Rust, you can read this blog post: Lock-freedom without garbage collection · Aaron Turon

The source code of crossbeam is also worth reading: https://github.com/aturon/crossbeam

This one Fearless Concurrency with Rust | Rust Blog was written before an may be helpful as well.

2 Likes

Hi again!

I have updated my implementation of Pipe so now it compiles (see GitHub link from my first post). I was using mpsc_queue.rs as example to do this.
Algorithm was slightly changed to prevent blocking data in writer's subqueue in some cases. I'm using now additional atomic pointer for writer's head. There is description of the new algorithm below.

Writing to the queue:

  1. Exclusively retrieve writer head using atomic::swap(null). (This prevents reader from trying to take ownership of writers subqueue).
  2. If it is null then create new head.
  3. Otherwise add data to the writer's tail.
  4. Retrieve reader's head using atomic::load(null). (Using load instead of swap prevents blocking of reader's subqueue).
  5. If it is null then set it to the writer's head.
  6. Otherwise restore writer's head.

Reading from the queue:

  1. Retrieve reader's head using atomic::load(). (Using load instead of swap prevents writer queue from overwriting reader's one while reader is working with it).
  2. If it is null then:
    2.1. Exclusively retrieve writer's head using atomic::swap(null). (Using swap guarantees that only writer or reader is owning writer's subqueue at each moment of time).
    2.2. If it is not null then assign it to the reader's head otherwise exit returning None.
  3. Shift reader's head to the next element and return original head data.

Right now I have a problem with writer/reader test (commented at the moment). Cargo blames on borrowing Pipe inside thread. Rust documentation advise to use Mutex in such a case but I don't need it in my situation because Pipe itself guarantees safety. So can anyone provide a solution for this?

Also let me know if you find logical mistake in my algorithm (I have tested C++ version and it works fine).