Atomics: Triple Buffer

I have a few questions about the best way to use atomics, mainly about performance since I only have access to an x86-64 computer to benchmark.

First, some background. I am trying to implement a triple buffer. At any given time, the "reader" is reading one buffer, and the "writer" is writing to another buffer. The writer can also read the buffer it just finished writing to (useful for incremental/recursive updates). If the writer is faster than the reader, it will cycle between the two buffers that the reader is not using. Otherwise, the reader will wait for an update from the writer.

I implement this with an AtomicU8 containing the index of the last-written buffer, along with a dirty bit. When the reader wants to check for updates, it fetches the last-written index and clears the dirty bit. When the writer wants to publish an update, it fetches the last-written index and swaps it with the index it just wrote to, and sets the dirty bit. If the dirty bit is cleared, it knows that the reader switched to the last-written index. Then the writer starts writing to the unused buffer (ie. not the buffer it just wrote to, nor the buffer that the reader is using). All of these operations are acquire-release.

First question: is this in fact sufficient to prevent data races or other unsoundness? The reader and writer both have the same reference to &'a UnsafeCell<[T; 3]>, so it's important that this model is correct. Or could I be using even more relaxed orderings?

Next: what is the best way to implement these operations for the reader? A simple way is last_written_atomic.fetch_and(!DIRTY_MASK, AcqRel). However, would it be more efficient to first try last_written_atomic.compare_and_exchange_weak(just_read, just_read, Relaxed, Relaxed), and only use fetch_and if the comparison fails?

What about the writer? I had used last_written_atomic.swap(just_written | DIRTY_MASK, AcqRel), is this the best way?

Lastly, I'm open to any feedback on how this could be designed better. Here is my preliminary code (Playground):

use std::{
    cell::UnsafeCell,
    sync::atomic::{AtomicU8, Ordering::*},
};

const INDEX_MASK: u8 = 0b011;
const DIRTY_MASK: u8 = 0b100;

pub struct TripleBuffer<T> {
    buf: UnsafeCell<[T; 3]>,
    last_written: AtomicU8,
}

pub struct Reader<'a, T> {
    tb: &'a TripleBuffer<T>,
    r: u8,
}

pub struct Writer<'a, T> {
    tb: &'a TripleBuffer<T>,
    r: u8,
    w: u8,
}

impl<T: Default> Default for TripleBuffer<T> {
    fn default() -> Self {
        Self::new([T::default(), T::default(), T::default()])
    }
}

impl<T> TripleBuffer<T> {
    pub fn new(buf: [T; 3]) -> Self {
        Self {
            buf: UnsafeCell::new(buf),
            last_written: AtomicU8::new(2),
        }
    }

    pub fn buf(&self) -> &[T; 3] {
        unsafe { &*self.buf.get() }
    }

    pub fn buf_mut(&mut self) -> &mut [T; 3] {
        unsafe { &mut *self.buf.get() }
    }

    pub fn into_buf(self) -> [T; 3] {
        self.buf.into_inner()
    }

    pub fn split(&mut self) -> (Writer<'_, T>, Reader<'_, T>) {
        (
            Writer {
                tb: self,
                r: 0,
                w: 1,
            },
            Reader { tb: self, r: 0 },
        )
    }

    unsafe fn inner_ref(&self, i: u8) -> &T {
        &*(self.buf.get() as *const T).add(i as usize)
    }

    #[allow(clippy::mut_from_ref)]
    unsafe fn inner_mut(&self, i: u8) -> &mut T {
        &mut *(self.buf.get() as *mut T).add(i as usize)
    }
}

impl<T> Reader<'_, T> {
    /// Returns a tuple of `(updated, last_written)`
    fn next(&mut self) -> (bool, &T) {
        // Get the last-written space and clear the dirty bit.
        let last_raw = self.tb.last_written.fetch_and(!DIRTY_MASK, AcqRel);
        let dirty = last_raw & DIRTY_MASK != 0;
        let last = last_raw & INDEX_MASK;
        if dirty {
            // Set the currently-read space to the last-written space
            debug_assert_ne!(self.r, last);
            self.r = last;
        }
        unsafe { (dirty, self.tb.inner_ref(self.r)) }
    }
}

impl<T> Writer<'_, T> {
    /// Returns a tuple of `(last_written, to_write)`
    fn next(&mut self) -> (&T, &mut T) {
        // Get the last-written space and swap with the just-written space.
        let last_raw = self.tb.last_written.swap(self.w | DIRTY_MASK, AcqRel);
        let dirty_clear = last_raw & DIRTY_MASK == 0;
        let last = last_raw & INDEX_MASK;
        debug_assert_ne!(self.r, self.w);
        debug_assert_ne!(self.w, last);
        if dirty_clear {
            // Set the currently-read space to the last-written space.
            debug_assert_ne!(self.r, last);
            self.r = last;
        }
        let just_written = self.w;
        // Set the currently-written space to the unused space: neither the currently-read space nor
        // the just-written space.
        self.w = 3 - self.r - self.w;
        unsafe { (self.tb.inner_ref(just_written), self.tb.inner_mut(self.w)) }
    }
}

EDIT: Also, would these be the proper bounds to implement Send/Sync?:

unsafe impl<T: Sync> Sync for TripleBuffer<T> {}
unsafe impl<T: Sync> Sync for Reader<'_, T> {}
unsafe impl<T: Sync> Sync for Writer<'_, T> {}

unsafe impl<T: Send> Send for TripleBuffer<T> {}
unsafe impl<T: Sync> Send for Reader<'_, T> {}
unsafe impl<T: Send + Sync> Send for Writer<'_, T> {}

I haven't reviewed the soundness, yet, but what jumped out to me immediately is your buffer.

You definitely want to align T inside the buffer to a cache line, otherwise you might get worse performance than a single-threaded application.

#[repr(align(128))]
struct Align128<T>(T);

impl<T> ::std::ops::Deref for Align128<T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

pub struct TripleBuffer<T> {
    buf: UnsafeCell<[Align128<T>; 3]>,
    last_written: AtomicU8,
}

If you only care about running the code on your personal computer, you can also check the cache line size via tools like CPU-Z. It's most likely 64 bytes for every CPU cache level. Using 128 bytes is the more conservative approach.

"cache line bouncing" and "false sharing" are the relevant terms you can search for.

1 Like

I haven't done a proof, so just a feeling. Is it possible the writer can overwrite something by that swap? Can it wrap around and bump into a slow reader from behind?

I think you need AcqRel for the unintuitive reason that the reader releases the ownership of one block by moving onto the next one so one needs to make sure no reads go past it. Anyway, that AcqRel won't be any slower than Acquire, at least on x86. You already pay premium by that fetch_add itself (atomic read-write operation). Doing something with just an ordinary read and updating only if it looks suitable might make it faster (eg. split into load and compare_and_swap, as you're the only one writing there at the time).

Depending on how busy this'll be, the last_written might become a bottleneck no matter what orderings you use. That's because two CPUs will be fighting over the cache line where it lives.

Tips: apart from doing a proof, you might want to pass it through miri.

As for the Sync/Send: only if T is also Send/Sync.

You might want to pad the items in the array to make sure they do not live in the same cache line.

1 Like

@Phlopsi That's a great idea. However I might leave that as a comment and leave it up to the user, because I'm not sure it would make sense for things like Box (although I did have array storage in mind, and that would definitely apply). Or do you think it's just worth it, a maximum 384 byte overhead?

@vorner Yes, I realized that I need AcqRel for the reader, I like to think of it as "claiming" a dirty buffer. In fact it claims the buffer that would normally be the very next one written! However I was wondering about a possible optimization that had occurred to me, namely trying this first:

last_written_atomic.compare_exchange_weak(just_read, just_read, Relaxed, Relaxed)

and only continuing to the fetch_and if that returns an error. I'm not sure if this would be better or worse, I just got the idea from parking_lot's Mutex. In fact I've hard a really hard time finding any information at all about the relative efficiency of atomic operations, which is why I'm asking most of these questions (this article is somewhat helpful though). I'm leaning towards just leaving this part as-is.

With these things, you always need to measure, because it can turn out either in your specific case.

That thing with compare_exchange_weak seems a bit complicated. If I was thinking about that, I'd try using just .load(Relaxed) and compare myself.

But what my intuition tells me:

  • You're likely to have just one tripple-buffer, not many. So you're tuning it to two threads fighting over one.
  • It'll either be "calm enough", in which case the performance doesn't matter that much, or it'll be busy and congested, in which case you'll lose performance on the fact that two threads heavily fight over the cache line with last_written, pulling it from one cache to another and back, no matter what orderings you use.
1 Like

It doesn't make a difference no matter what you store, unless the elements are already aligned to the cache line size, but then the wrapper has no effect and cost.

A Box on your machine is either 8 or 16 bytes large, i.e. all 3 elements of the buffer theoretically fit in the same cache line. If they do in practice, trying to read from buf[2] while a write to buf[0] just happened, will invalidate the whole cache line and the read operation will have to wait for the cache to be updated. This process can repeat indefinitely causing attempts to read from the buffer to fail again and again, although no write ever happened to the memory region of buf[2]. From the perspective of your CPU, at least 2 elements of your buf would always overlap in the case of a Box, even if from a compiler perspective they do not.

P.S.: TripleBuffer::<T>::inner_ref and TripleBuffer::<T>::inner_mut are likely unsound, because I presume you don't want the user to multiply the provided value with ::std::mem::size_of::<T>(), but instead do that inside the methods for them.

1 Like

@vorner I'm reluctant to measure because all I have is an x86-64 laptop, I have no idea how this would behave on a weakly-ordered architecture like ARM and I'm not sure how to test it (maybe on my phone??).

As for fighting over last_written, what would you suggest instead? The issue at hand is that the way this works is, the reader reads at one index (maybe 0) and the writer flip-flops between the other two indexes (maybe 1 and 2). At one point the reader needs to swap its index with one of the writer's. It has occurred to me that I could have the reader "request" a swap (Relaxed) and then wait while the writer checks for requests (also Relaxed), so the only AcqRel operation is whenever the writer sees and responds to a swap request. But my concern with that method is that the reader will be starved waiting for a fresh buffer while the writer is writing.

@Phlopsi I see what you're saying about performance. I was thinking something like Box<Box<T>> in which case the outer box would never need to change, but that doesn't really make sense now that I think about it, and I think the size penalty is probably worth it.

I realize that inner_ref and inner_mut are unsafe due to a lack of bounds checking which is why they are marked unsafe and left as private methods, but I think the offsets are correct, since (*const/*mut T).add() already multiplies by std::mem::size_of::<T>(), just like &ptr[offset] in C.

I never had to use pointer arithmetic. I'm positively surprised, that it can't be forgotten when using <pointer>.add!

I'm reluctant to measure because all I have is an x86-64 laptop, I have no idea how this would behave on a weakly-ordered architecture like ARM

While now you have no idea how this'll behave at all? :wink: I mean, there are differences between the architectures, but there's quite a good chance that if you have something that is slow on x86, then it'd also be slow on ARM. I'd say if you can measure a significant difference between two algorithms on x86, then pick the faster and you have a good chance it will be better on ARM. If you have two that are of about the same speed, then you don't really have much info about if one of them might be faster on ARM, though.

Measuring on ARM is better and I usually find some raspberry pi or some router I have around for that (phone looks good too), but even without these, you should take at least the data you can get your hands on.

As for fighting over last_written , what would you suggest instead?

That kind of depends on the use case. I guess for many practical cases, it's good enough and optimizing it is not worth it. I don't really know why you need this (or if this is just an exercise). Maybe even having something like [Mutex<T>; 3] would be good enough for you?

Maybe you could do something like [CacheAligned<(T, AtomicUsize)>; 3], where 0 means „unused“, 1 „owned by a writer“ and 2 „owned by reader“. Then you could do things like data[index].compare_exchange(0, 2, Relaxed, Acquire) to claim it. Now, the cache lines will still have to travel from one cache to another (because the ownership between the writer and reader is passed), but at least it might be the same cache line as the data and they won't fight all the time, only when there's actually a switch.

@vorner If it helps, the reason I want this is to implement a basic physics simulation where each step is computed (written to buffer) based on the last, then drawn (read from buffer) to the screen. The writer is usually much faster than the <=60fps reader, but I also want it to work when the reader is faster (ie. the simulation speed drops below the screen’s refresh rate).

I know that an array of RwLocks would work (it’s what I currently do). I just wanted to see if I could make it faster. If you had an atomic for each buffer like you suggest (which I think is very similar to simply using RwLocks), in order to switch the writer, you’d have to release one atomic and acquire another, instead of just a single acquire-release atomic operation. And when it comes time to switch the reader, it could be very slow if it takes 3, 4, 5+ tries to acquire one of the two locks since you could be fighting the writer. I’m not sure how this would look in practice, I think it depends heavily on the reader/writer speeds and patterns. Maybe it would be faster, I guess I have to benchmark.

Also could you elaborate a bit about the cache lines? Is your point that having both threads write to the same atomic again and again is inefficient? In that case wouldn't it be better to just do a relaxed load followed by a compare and exchange (aka fetch_update)?

I'd say that if we are talking about hundreds or thousands ops per second, everything mentioned here is fast enough. You could probably synchronize it through a database at these speeds :innocent:.

My idea relies on the fact that a failed attempt is really fast, because it's just a relaxed load. That's the same speed as just reading from local unsynchronized variable (unless other core holds that bit of memory in its cache and needs to be asked for it). But if you do any kind of atomic update (read+write) operation, even if it would be Relaxed, then you suddenly get 100 times slower or so.

About the cache lines. You have RAM, which is Far Away and slow, so CPU has caches. Usually, each core has its own private L1 and L2, L3 is shared.

Now, both the cache and RAM is split into lines, not individual bytes, because the overhead would be too large for one byte.

Now, if a cache has a cache line loaded, it can be in various states. If one core loads a cache line from RAM and modifies it, it becomes „dirty“. Now another core wants to read from that cache line (either the same bytes or even different ones). It can't just go to RAM and fetch the data from there, because the first core didn't yet write it back, so it actually has to go to the first core and ask „Hey, I want this cache line, give it to me“ and wait for it to comply. This is slow. Much slower than each CPU having its own cache line. And if two threads both alternate on accessing and modifying the same atomic variable, you get kind of ping-pong between the two cores, fighting over the cache line.

So if you want to be really really fast, you want to limit these moves of cache lines between cores/threads. But as I said, with 60fps or even 1000fps, you probably don't really care.

And no, I can't really answer any question of „wouldn't it be faster if I did XY“, not without trying it and measuring it. I can point out some pitfalls I see, or suggest possible ideas of what might work (none of it is in category will absolutely for sure work), these are more like possible directions of experimentation and research.

2 Likes

@vorner Ok I think I understand what you're saying for the most part. What I don't understand is how does separating into three atomics improve performance in this specific case? I can't seem to reason about it.

I haven't been able to think of a way in any design (current design, three RwLocks, etc.) that avoids doing an acquire/release each cycle. I think the writer needs to release so that if the reader happens to claim the new buffer, writes to the just-written buffer won't show up too late; the writer needs to acquire so that writes to the new buffer won't show up too early; and the reader needs to acquire/release to synchronize with those operations. Correct me if I'm wrong about that.

I can see how atomic reads would be far cheaper than writes, and how unnecessary writes can make the cache lines flop back and forth. Since in my case I expect the buffer writer to cycle faster than the reader, I wasn't too concerned about the reader doing unnecessary atomic writes, but if I wanted to fix that (and stick with one atomic), would the code below be a good approach? I know I need to benchmark, I'm just asking if this looks right:

impl<T> Reader<'_, T> {
    /// Returns a tuple of `(updated, last_written)`
    fn next(&mut self) -> (bool, &T) {
        let updated = if self.tb.last_written.load(Relaxed) == self.r {
            false
        } else {
            // Set the currently-read space to the last-written space
            self.r = self.tb.last_written.fetch_and(!DIRTY_MASK, AcqRel) & INDEX_MASK;
            true
        };
        unsafe { (updated, self.tb.inner_ref(self.r)) }
    }
}

Also do you or anybody know if the Send/Sync bounds on the first post look right?

Thank you for all your help!

My idea why it should help was, if a slow reader sits on one atomic in its (now private) cache line and writer keeps using the other two, then no ping-pong would be happening. But thinking about it again, it probably doesn't work, because the writer still needs to look at the reader's atomic, doesn't it? :thinking: No, it was not a fully formed thought.

Your optimized SPSC cell fits right into my own project for providing generic atomic cell types, so I started implementing a basic version, that uses boxes. I'll also write an optimized version, soon. Your buffer version is kinda like having a custom arena allocator, instead of potentially (de)allocating boxes after each operation.

Btw, you may want to take a look at how I solved the when-does-my-type-impl-send-and/or-sync problem via PhantomData. All I need is a std type, that I know behaves the same as my own regarding thread safety. :slightly_smiling_face:

impatience::spsc::v1
use crate::std;

pub struct Cell<T> {
    shared_ptr: std::AtomicPtr<T>,
    phantom: std::PhantomData<std::Mutex<T>>,
}

impl<T> Cell<T> {
    pub fn new(value: T) -> Self {
        Self {
            shared_ptr: std::AtomicPtr::new(std::Box::into_raw(std::Box::new(
                value,
            ))),
            phantom: std::PhantomData,
        }
    }

    pub fn split(&mut self) -> (Consumer<'_, T>, Producer<'_, T>) {
        let origin = &*self;

        (
            Consumer {
                origin,
                last_value: std::None,
            },
            Producer { origin },
        )
    }
}

pub struct Consumer<'a, T> {
    origin: &'a Cell<T>,
    last_value: std::Option<T>,
}

impl<'a, T> Consumer<'a, T> {
    pub fn get(&mut self) -> (bool, &T) {
        unsafe {
            let updated = self
                .origin
                .shared_ptr
                .swap(std::null_mut(), std::SeqCst)
                .as_mut()
                .map_or(false, |ptr| {
                    self.last_value = std::Some(*std::Box::from_raw(ptr));

                    true
                });

            (
                updated,
                self.last_value
                    .as_ref()
                    .unwrap_or_else(|| std::unreachable_unchecked()),
            )
        }
    }
}

pub struct Producer<'a, T> {
    origin: &'a Cell<T>,
}

impl<'a, T> Producer<'a, T> {
    pub fn set(&mut self, value: T) {
        unsafe {
            self.origin
                .shared_ptr
                .swap(std::Box::into_raw(std::Box::new(value)), std::SeqCst)
                .as_mut()
                .map(|ptr| std::drop(std::Box::from_raw(ptr)))
                .unwrap_or(())
        }
    }
}

@vorner Unfortunately I think the writer and reader need to check in with each other each cycle.

I guess one way to optimize is you could have the writer do some number of consecutive cycles before publishing/releasing to the reader. For example if you predict that the writer is 10x faster, you could just run 10 cycles without updating the atomic.

Or maybe you could try to have the reader request a buffer, and the writer only acquires/releases by request. It would have to be requested preemptively to avoid lag, or maybe you could use a quadruple buffer where the reader lags behind a frame. Ie., the reader always has two buffers; each cycle, if there is a new buffer ready, it will switch to that one then request another.

@Phlopsi I think crossbeam::atomic::AtomicCell might be similar to what you’re making. However it’s not quite what I’m looking for, I’m trying to implement RwLock-like semantics. Also sadly I think PhantomData won’t work because I’m using UnsafeCell, but I haven’t tried extensively so it might be possible.

From crossbeam:

Operations on AtomicCell s use atomic instructions whenever possible, and synchronize using global locks otherwise. You can call AtomicCell::<T>::is_lock_free() to check whether atomic instructions or locks will be used.

Basically, it uses locks for anything that doesn't fit into the largest available native atomic type. Mine always uses boxing and is limited to Copy types, but is also always lock-free. If crossbeam's AtomiCell<T> is lock-free, always use theirs. Mine is only useful for types that don't fit into native atomic types.

It's impossible to expose a safe and lock-free API, that enables multiple writers and readers for non-Copy types. Readers have the property to observe the same value as other readers, at the same time. Due to interior mutability, handing out a reference would only be possible, if the type is already Send and Sync, but then there is no point in wrapping it in an atomic cell. There is no thread-safe and lock-free counterpart to RefCell.

@Phlopsi Yes sadly that’s true. I was just trying to think of a way to reduce the number of locking operations, see if there’s any way to avoid acquire/release for every write. For example, one way I came up with is to have the reader request an update, then the writer only needs to send an update when requested. The downside is that there would be a delay for the writer to respond, but if you would rather lower latency than fresh data you could always keep a buffer or two queued up (a la vsync).

Can the &T returned by Reader::next and Writer::next point to the same data?

@Phlopsi Yes. Both methods should return the latest possible data, which can be same data.

Anyway at this point I'm satisfied with the answers to my original questions. Thank you both for helping!

Storing a (Ref)Cell<_> can cause a data race, then, i.e. your API is unsound. That's the problem with interior mutability. It allows you to obtain a &mut from a &.