AtomicPtr based ring buffer implementation

I am new to Rust, but I have significant background in low-level concurrency code in other languages. I am trying to build a CAS based lock-free ring buffer for function pointers (as part of building a thread pool implementation, just to give some background).
I use an array of AtomicPtr<Box<'static + Send + FnOnce()>> as the underlying data structure. Does that look like a good choice?
I have a couple of questions for which I would appreciate any suggestions and explanations:

  • Ideally I would not need the Box indirection, but since FnOnce is a DST, I can not store a pointer to FnOnce directly in an AtomicPtr. Is this sound reasoning? Is there a better way?
  • Is it sound to store a null pointer inside an AtomicPtr? I found nothing forbidding it in the documentation, but I'd like to make doubly sure. I'd like to represent an empty slot in the ring buffer by a null pointer, using CAS operations for thread synchronization. Does that make sense or is there a better idiomatic way of doing it in Rust?
  • This is probably a real newby question, but here goes: The push() method should take ownership of an FnOnce instance and store (a pointer to) it in the appropriate AtomicPtr - transferring ownership to the AtomicPtr, and preventing it from being dropped right away. How do I do that? And how do I reclaim ownership in the pop() function that CASes out the pointer to the FnOnce?

Thanks for reading this - any help or discussion is greatly appreciated!

1 Like

Just to clarify since "ring buffer" can alas mean different things to different people, you're thinking about single-producer single-consumer FIFO communication with bounded queue capacity, correct?

If so, you don't really need pointer storage to be atomic, and you don't really need CAS either. All you need is an array (or boxed slice) of mutably aliased function pointers S [UnsafeCell<Box<'static + Send + FnOnce()>>; N] and a pair of AtomicUsizes to mark the read pointer R and the write pointer W.

R and W are initially set to 0, R == W being understood as "buffer empty" and W == R-1 being understood as "buffer full" in this implementation.

Producer algorithm is:

  • Read current value of W with Ordering::Relaxed.
  • Read current value of R with Ordering::Relaxed.
  • If W == R-1, report that queue is full and abort transaction.
  • Non-atomically write function pointer into storage slot S[W], which consumer cannot read because it will stop and declare the buffer empty upon reaching R == W.
  • Update W to W+1 with Ordering::Release. Increment does not need to be atomic, since we are the only thread that writes to W. But Release ordering is needed to propagate writes to S[W].

Consumer algorithm is:

  • Read current value of R with Ordering::Relaxed.
  • Read current value of W with Ordering::Acquire (to propagate writes from producer).
  • If R == W, report that queue is empty and abort transaction.
  • Non-atomically read function pointer from storage slot S[R], which producer cannot write because it will stop and declare the buffer full upon reaching W == R-1
  • Update R to R+1 with Ordering::Relaxed, again increment doesn't need to be atomic as we are the only thread that writes to R.

Note that as a convenience shortcut, every time I write X+/-1, it should be understood as X+/-1 modulo N.

4 Likes

Hadrien, thanks for taking the time to suggest the alternative approach for "single producer / single consumer". I'm going for multi-consumer however - the ring buffer is intended to become part of a work-stealing thread pool implementation. Therefore I'd like to CAS out consumed functions.

The part I've not been able to figure out so far is pretty much unrelated to the algorithmic part: How do I store stuff in an AtomicPtr in such a way that the contents are not dropped?

fn store_in_ptr(s: Box<String>, &mut ptr: AtomicPtr<String>) {
    ???
    // I want to store the String in the AtomicPtr (without it being dropped, obviously)
}

You cannot store a String in an AtomicPtr because it contains more data than just a pointer (pointer + length + capacity). If you need to store it atomically, you need either a layer of indirection or hardware specific larger-than-pointer atomic transactions.

The simplest indirection approach is to box the object then use Box's into_raw and from_raw to get the box's inner pointer and put it back in when you're done. Don't forget to drop all the remaining boxes when the queue is dropped. Also, this will only work for objects with thin pointers, not for trait objects like dyn FnMut, because they contain two pointers and there is no portable hardware atomic instruction for that.

For those, if you want to stay portable, you will likely need a second layer of indirection. Unless you feel like reimplementing boxed traits objects using a C++-like vtable-in-object layout, which is possible but very tricky and likely to break in the future because it relies on unstable rustc impl details.

More advanced algorithms that do not allocate for every input also exist, basically they use a (possibly growable) shared storage block instead of allocating a block for every input object.

This stuff gets complex quickly, so I would strongly suggest just using crossbeam's dequeue instead if it fits your needs. Or checking if rayon have started exposing their internal work queue in a crate now, iirc they were planning to do so someday.

2 Likes

Thanks a lot - Box::into_raw and Box::from_raw is what I was looking for (and missed so far).
And thanks for the pointers to crossbeam and rayon. I'll take a look at them, but since this is a learning project for me, I'll also continue doing stuff myself.

and thanks for you insightful comments on memory management!

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.