Code review: Barebone triple buffer

Partly because I need to publish a state from one thread to another and partly because I want to learn how to do this, I wrote a triple buffer. I decided to constrain it to big objects, i.e. the ones that implement the clone trait. For convenience I require also the default trait.

The triple buffer itself holds two naked pointers to such objects, an atomic pointer and an atomic bool. The "constructor" of the buffer gives a producer and a consumer that cannot be cloned. They both contain an Arc to the triple buffer for cleanup and a pointer to the triple buffer.

There is no way to "push" a state to the buffer. Instead you have to ask the producer of a mutable reference to the state that the buffer holds and then write on it. After that you have to drop the reference and publish the state. When publishing the buffer swaps the back pointer and the atomic pointer and sets the atomic bool to true. Then when you ask the consumer for the state it asks the buffer for it and if the atomic bool is true it swaps the front pointer and the atomic pointer and it returns a mutable reference to the front object.

I opted not to use an array of length 3 to keep the objects and have an atomic u8 for bookeeping only because this would be more complicated and error-prone. Is there a significant overhead in having both an atomic pointer and an atomic bool instead of just an atomic u8?

The design is pretty straightforward but it uses quite a few of dodgy unsafe operations. In construction I create 3 boxes with the default objects and turn them into pointers. Then every time the front buffer or the back buffer needs to be borrowed, I turn the appropriate pointer to a mutable reference.

I have tried it out in tests and it seems to be working as expected. However, I'm afraid that I missed something that can break it or that I have written something very inefficient.

I have pasted the important parts of my implementation. Any feedback is welcome.

pub(crate) struct TripleBuffer<T>
where
  T: Send + Clone + Default,
{
  back: *mut T,
  idle: AtomicPtr<T>,
  front: *mut T,
  new_data: Arc<AtomicBool>,
}

impl<T> TripleBuffer<T>
where
  T: Send + Clone + Default,
{
  pub(crate) fn init() -> (TripleBufferProducer<T>, TripleBufferConsumer<T>) {
    let back = Box::into_raw(Box::new(T::default()));
    let idle = Box::into_raw(Box::new(T::default()));
    let front = Box::into_raw(Box::new(T::default()));
    let new_data = Arc::new(AtomicBool::new(false));
    let buffer = Arc::new(TripleBuffer {
      back,
      idle: AtomicPtr::new(idle),
      front,
      new_data,
    });
    return (TripleBufferProducer::new(buffer.clone()), TripleBufferConsumer::new(buffer));
  }

  #[inline(always)]
  pub(super) fn get_back(&mut self) -> &mut T {
    unsafe {
      return &mut *self.back;
    }
  }

  #[inline(always)]
  pub(super) fn publish_back(&mut self) {
    let swapped = self.idle.swap(self.back, Ordering::Acquire);
    self.back = swapped;
    self.new_data.store(true, Ordering::Release);
  }

  #[inline(always)]
  pub(super) fn get_new_front(&mut self) -> Option<&mut T> {
    if self.new_data.load(Ordering::Acquire) {
      let swapped = self.idle.swap(self.front, Ordering::Acquire);
      self.front = swapped;
      self.new_data.store(false, Ordering::Release);
      unsafe {
        return Some(&mut *self.front);
      }
    }
    return None;
  }

  #[inline(always)]
  pub(super) fn get_front(&mut self) -> &mut T {
    if self.new_data.load(Ordering::Acquire) {
      let swapped = self.idle.swap(self.front, Ordering::Acquire);
      self.front = swapped;
      self.new_data.store(false, Ordering::Release);
    }
    unsafe {
      return &mut *self.front;
    }
  }
}

impl<T> Drop for TripleBuffer<T>
where
  T: Send + Clone + Default,
{
  fn drop(&mut self) {
    unsafe {
      let _back = Box::from_raw(self.back);
      let _front = Box::from_raw(self.front);
      let idle_ptr = self.idle.swap(ptr::null_mut(), Ordering::Relaxed);
      if !idle_ptr.is_null() {
        let _idle = Box::from_raw(idle_ptr);
      }
    }
  }
}

// producer

pub(crate) struct TripleBufferProducer<T>
where
  T: Send + Clone + Default,
{
  pub(super) buffer: Arc<TripleBuffer<T>>,
  pub(super) ptr: *mut TripleBuffer<T>,
}

impl<T> TripleBufferProducer<T>
where
  T: Send + Clone + Default,
{
  pub fn new(buffer: Arc<TripleBuffer<T>>) -> Self {
    let ptr = Arc::as_ptr(&buffer) as *mut TripleBuffer<T>;
    Self {
      buffer,
      ptr,
    }
  }

  #[inline(always)]
  fn get_state(&mut self) -> &mut T {
    let mutable_ref = unsafe { &mut *self.ptr };
    return mutable_ref.get_back();
  }

  #[inline(always)]
  fn publish(&mut self) {
    let mutable_ref = unsafe { &mut *self.ptr };
    mutable_ref.publish_back();
  }
}

// consumer

pub(crate) struct TripleBufferConsumer<T>
where
  T: Send + Clone + Default,
{
  pub(super) buffer: Arc<TripleBuffer<T>>,
  pub(super) ptr: *mut TripleBuffer<T>,
}

impl<T> TripleBufferConsumer<T>
where
  T: Send + Clone + Default,
{
  pub fn new(buffer: Arc<TripleBuffer<T>>) -> Self {
    let ptr = Arc::as_ptr(&buffer) as *mut TripleBuffer<T>;
    Self {
      buffer,
      ptr,
    }
  }

  #[inline(always)]
  fn get_new_state(&mut self) -> Option<&mut T> {
    let mutable_ref = unsafe { &mut *self.ptr };
    return mutable_ref.get_new_front();
  }

  #[inline(always)]
  fn get_state(&mut self) -> &mut T {
    let mutable_ref = unsafe { &mut *self.ptr };
    return mutable_ref.get_front();
  }
}
}

Here's a playground link:

Since nobody else has replied so far - maybe it would be useful to give some examples of use - i.e. what do you want to achieve? What is the publicly exposed API (currently only the new methods are exposed)?

Also if you could share a working playground link then people can more easily experiment with it.

And further, some reasoning for why you need unsafe in the first place. Typically you would also make a reasoning/judgement step at each use of unsafe for why this is actually safe to do.

Hey, thanks for the reply.

My use case is to transfer the state of a limit order book (LOB). I have a thread (actually a tokio task) that keeps tracks a LOB. It gets LOB updates and updates its own. Then the state of the LOB needs to be transferred to another thread (tokio task). I want this to be non-blocking for both the producer and the consumer. I also want to avoid allocating memory, thus the buffer is preallocated and it doesn't accept an object. So the object to be transfered is a struct with two vectors of pair that represent the levels of the LOB. The producer writes the first n levels of the LOB in the buffer and then publishes it. The consumer then reads it and makes a decision.

The reason of having so many things being public in my code is that it is a work in progress and I haven't sanitised the permissions yet :frowning:

Regarding the unsafe code, I didn't find a safe way to do what I wanted. Maybe this is because I come from a C++ point of view though. So I want to have the objects preallocated, I don't want to block both the producer and the consumer and of course I don't want to copy objects around. The problem I had with safe code is that if I share the struct in a safe way, it will be blocking one side when the other side is using the buffer. This is due to having the object preallocated and getting a reference. So my solution is that the producer and the consumer both have an unsafe reference to the buffer and make sure to restrict them on the different sides of the buffer. The second usage of unsafe is when I dereference the pointer of the object that needs to be returned, but this again looks unavoidable since I'm storing pointer in the struct. I guess I could store the objects in an array of length 3 and have an atomic u8 for the flags instead, but I decided to against this because it is more error prone, I trust more the unsafe path here. I'm willing to do this if it makes a difference to the performance of the buffer.

Finally, I didn't add the full code with the test because from this I derive an async version whose consumer can wait for new data. I don't want to put too much code here because it would dilute my point. But if you think it is useful, I can write a test case for this version and add it here.

EDIT: I only now I realised you said a playground link and I just found out this exists. I will do that in a bit.

EDIT 2: I added the link.

the most basic tripple buffer just needs 3 boxes, one is owned by the "producer" thread, "one" is owned by the "consumer" thread, and one is owned by a shared queue and is stored as an atomic pointer.

you can create newtype wrapper on top of Box, but this idea of swapping ownerships still applies.

in fact, the role of "producer" and "consumer" is completely arbitrary, each thread just swaps its own box with the shared queue.

if you want to detect stale or dropped ticks, you can do it in the user data, e.g. the producer can maintain a monotonic counter for each packet it produced. alternatively, if the user data is sufficiently aligned, you can store some metadata in the pointer bits as an optimization too.

	// the shared "queue"
	let shared = AtomicQueue::<MyData>::default();
	// I use scoped threads to simplify the example
	// you can use detached theads with `Arc` without problem
	std::thread::scope(|s| {
		// the "producer" thread
		s.spawn(|| {
			let mut local_buffer = Box::new(MyData::default());
			for _ in 0..20 {
				*local_buffer = produce_data();
				// this call swaps the ownership of buffers between this thread and the queue
				local_buffer = shared.swap(local_buffer);
			}
		});
		// the "consumer" thread
		s.spawn(|| {
			let mut local_buffer = Box::new(MyData::default());
			for _ in 0..20 {
				// this call swaps the ownership of buffers between this thread and the queue
				local_buffer = shared.swap(local_buffer);
				consume_data(&local_buffer);
			}
		});
	});

the only unsafe required is to convert back from raw pointers into boxes, since we don't have atomic boxes (yet? ever? who knows).

playground:

Thanks for this. A quick question, Doesn't this *local_buffer = produce_data(); allocate memory for a new object? Do I miss something here?

Compare also the helpfully named triple_buffer crate:

That also has a handy set of comparisons to other synchronization primitives.

1 Like

it does NOT allocate on the heap. what it does is overwriting the old value in the Box with a new value, and dropping the old value.

however, it does construct the new value first then move it into the box, which may or may not be optimized away depending on the nature of MyData.

if the move is a concern, you can use in-place updates too, local_buffer is owned (as opposed to borrowed) by the producer thread, it can do whatever:

produce_data(&mut local_buffer);

fn produce_data(data_buffer: &mut MyData) { todo!() }
1 Like

yes, that's what I was thinking. Thank you

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.