I have a few questions about the best way to use atomics, mainly about performance since I only have access to an x86-64 computer to benchmark.
First, some background. I am trying to implement a triple buffer. At any given time, the "reader" is reading one buffer, and the "writer" is writing to another buffer. The writer can also read the buffer it just finished writing to (useful for incremental/recursive updates). If the writer is faster than the reader, it will cycle between the two buffers that the reader is not using. Otherwise, the reader will wait for an update from the writer.
I implement this with an AtomicU8
containing the index of the last-written buffer, along with a dirty bit. When the reader wants to check for updates, it fetches the last-written index and clears the dirty bit. When the writer wants to publish an update, it fetches the last-written index and swaps it with the index it just wrote to, and sets the dirty bit. If the dirty bit is cleared, it knows that the reader switched to the last-written index. Then the writer starts writing to the unused buffer (ie. not the buffer it just wrote to, nor the buffer that the reader is using). All of these operations are acquire-release.
First question: is this in fact sufficient to prevent data races or other unsoundness? The reader and writer both have the same reference to &'a UnsafeCell<[T; 3]>
, so it's important that this model is correct. Or could I be using even more relaxed orderings?
Next: what is the best way to implement these operations for the reader? A simple way is last_written_atomic.fetch_and(!DIRTY_MASK, AcqRel)
. However, would it be more efficient to first try last_written_atomic.compare_and_exchange_weak(just_read, just_read, Relaxed, Relaxed)
, and only use fetch_and
if the comparison fails?
What about the writer? I had used last_written_atomic.swap(just_written | DIRTY_MASK, AcqRel)
, is this the best way?
Lastly, I'm open to any feedback on how this could be designed better. Here is my preliminary code (Playground):
use std::{
cell::UnsafeCell,
sync::atomic::{AtomicU8, Ordering::*},
};
const INDEX_MASK: u8 = 0b011;
const DIRTY_MASK: u8 = 0b100;
pub struct TripleBuffer<T> {
buf: UnsafeCell<[T; 3]>,
last_written: AtomicU8,
}
pub struct Reader<'a, T> {
tb: &'a TripleBuffer<T>,
r: u8,
}
pub struct Writer<'a, T> {
tb: &'a TripleBuffer<T>,
r: u8,
w: u8,
}
impl<T: Default> Default for TripleBuffer<T> {
fn default() -> Self {
Self::new([T::default(), T::default(), T::default()])
}
}
impl<T> TripleBuffer<T> {
pub fn new(buf: [T; 3]) -> Self {
Self {
buf: UnsafeCell::new(buf),
last_written: AtomicU8::new(2),
}
}
pub fn buf(&self) -> &[T; 3] {
unsafe { &*self.buf.get() }
}
pub fn buf_mut(&mut self) -> &mut [T; 3] {
unsafe { &mut *self.buf.get() }
}
pub fn into_buf(self) -> [T; 3] {
self.buf.into_inner()
}
pub fn split(&mut self) -> (Writer<'_, T>, Reader<'_, T>) {
(
Writer {
tb: self,
r: 0,
w: 1,
},
Reader { tb: self, r: 0 },
)
}
unsafe fn inner_ref(&self, i: u8) -> &T {
&*(self.buf.get() as *const T).add(i as usize)
}
#[allow(clippy::mut_from_ref)]
unsafe fn inner_mut(&self, i: u8) -> &mut T {
&mut *(self.buf.get() as *mut T).add(i as usize)
}
}
impl<T> Reader<'_, T> {
/// Returns a tuple of `(updated, last_written)`
fn next(&mut self) -> (bool, &T) {
// Get the last-written space and clear the dirty bit.
let last_raw = self.tb.last_written.fetch_and(!DIRTY_MASK, AcqRel);
let dirty = last_raw & DIRTY_MASK != 0;
let last = last_raw & INDEX_MASK;
if dirty {
// Set the currently-read space to the last-written space
debug_assert_ne!(self.r, last);
self.r = last;
}
unsafe { (dirty, self.tb.inner_ref(self.r)) }
}
}
impl<T> Writer<'_, T> {
/// Returns a tuple of `(last_written, to_write)`
fn next(&mut self) -> (&T, &mut T) {
// Get the last-written space and swap with the just-written space.
let last_raw = self.tb.last_written.swap(self.w | DIRTY_MASK, AcqRel);
let dirty_clear = last_raw & DIRTY_MASK == 0;
let last = last_raw & INDEX_MASK;
debug_assert_ne!(self.r, self.w);
debug_assert_ne!(self.w, last);
if dirty_clear {
// Set the currently-read space to the last-written space.
debug_assert_ne!(self.r, last);
self.r = last;
}
let just_written = self.w;
// Set the currently-written space to the unused space: neither the currently-read space nor
// the just-written space.
self.w = 3 - self.r - self.w;
unsafe { (self.tb.inner_ref(just_written), self.tb.inner_mut(self.w)) }
}
}
EDIT: Also, would these be the proper bounds to implement Send
/Sync
?:
unsafe impl<T: Sync> Sync for TripleBuffer<T> {}
unsafe impl<T: Sync> Sync for Reader<'_, T> {}
unsafe impl<T: Sync> Sync for Writer<'_, T> {}
unsafe impl<T: Send> Send for TripleBuffer<T> {}
unsafe impl<T: Sync> Send for Reader<'_, T> {}
unsafe impl<T: Send + Sync> Send for Writer<'_, T> {}