Code review: TripleBuffer for sending huge objects between threads


#1

A triple buffer is a well-known concept in computer graphics:

  • One thread produces the images to be displayed and sends them one after another to the buffer mechanism.
  • Another thread fetches the image one after another from the buffer mechanism and brings them onto screen
  • The buffer is typically only one entry deep to minimize latency

As simple as: producer --> buffer --> consumer

Normally the producer and consumer don’t have the same rate:

  • If the producer is too slow, the buffer underruns. A common strategy is to re-use the recent image (or to order a new graphics-accelerator :slightly_smiling_face:).
  • If the producer is too fast, the buffer overflows. There are two common stategies to solve this (simplified):
    • Double buffering: The producer blocks until the buffer is freed
    • Triple buffering: The producer overwrites the most recent version in the buffer

I tried to implement this triple buffer approach to asynchronously send big chunks of data from one thread to another. There shouldn’t by any heap allocations and the data should be mutable on both ends. This was achieved by always swapping buffers into/out of the buffer algorithm.

To my surprise this went really smooth and the result was way better than I had expected:

  • Not a single explicit lifetime
  • Multiple senders and receivers are supported
  • Senders and receivers own their current data

This is my first real attempt to write something concurrent in Rust and I’d like to know if this can be improved somehow.

use std::thread;
use std::sync::{ Mutex, Arc, Barrier };
use std::sync::atomic::{ AtomicUsize, AtomicBool, Ordering };
use std::cell::Cell;
//use std::time::Duration;

struct TripleBuffer<T> {
    buffer: Mutex<Cell<T>>,
    /// true: buffer contains data ready to be retrieved by the receiver
    /// false: buffer contains a placeholder ready to be filled by a sender
    is_filled: AtomicBool,
    /// diagnostic value counting the number failed mutex locks
    error_count: AtomicUsize,
}

#[allow(dead_code)]
impl<T> TripleBuffer<T> {

    pub fn new(initial_buffer: T, is_filled: bool) -> Self {
        TripleBuffer {
            buffer: Mutex::new(Cell::new(initial_buffer)),
            is_filled: AtomicBool::new(is_filled),
            error_count: AtomicUsize::new(0),
        }
    }

    /// double buffer approach: only set the buffer if the target is empty
    /// returns a spare buffer on success and new_buffer on failure
    pub fn fill_buffer(&self, new_buffer: T) -> (T, bool) {
        if let Ok(buffer) = self.buffer.lock() {
            let was_filled = self.is_filled.swap(true, Ordering::Relaxed);
            (if was_filled { new_buffer } else { buffer.replace(new_buffer) }, !was_filled)
        } else {
            self.error_count.fetch_add(1, Ordering::Relaxed);
            (new_buffer, false)
        }
    }

    /// triple buffer approach: always set the new buffer to be the current one
    pub fn force_buffer(&self, new_buffer: T) -> (T, bool) {
        if let Ok(buffer) = self.buffer.lock() {
            let was_filled = self.is_filled.swap(true, Ordering::Relaxed);
            (buffer.replace(new_buffer), !was_filled)
        } else {
            self.error_count.fetch_add(1, Ordering::Relaxed);
            (new_buffer, false)
        }
    }

    /// gets the current buffer by swapping in a spare one
    /// the spare one gets returned in case there isn't a current buffer present
    pub fn get_buffer(&self, spare_buffer: T) -> (T, bool) {
        if let Ok(buffer) = self.buffer.lock() {
            let was_filled = self.is_filled.swap(false, Ordering::Relaxed);
            (if was_filled { buffer.replace(spare_buffer) } else { spare_buffer }, was_filled)
        } else {
            self.error_count.fetch_add(1, Ordering::Relaxed);
            (spare_buffer, false)
        }
    }

    pub fn get_error_count(&self) -> u32 {
        self.error_count.load(Ordering::Relaxed) as u32
    }

}

fn main() {

    let initial_buffer: Box<[u16]> = vec![0u16; 1000].into_boxed_slice();
    let broker = Arc::new(TripleBuffer::new(initial_buffer, false));
    let barrier = Arc::new(Barrier::new(2));

    let sender = {
        let broker = broker.clone();
        let barrier = barrier.clone();
        thread::spawn(move || {
            //let sleep_duration = Duration::from_millis(1000);
            let mut active_buffer: Box<[u16]> = vec![0u16; 1000].into_boxed_slice();
            let mut overflow_count = 0;
            for i in 0..5 {
                // Modify the buffer, which takes some time
                active_buffer[0] = i;
                println!("Sender {:?} {:?}", i, active_buffer[0]);
                //thread::sleep(sleep_duration);

                // provoke race-condition with receiver
                barrier.wait();
                let (buffer, was_consumed) = broker.force_buffer(active_buffer);
                active_buffer = buffer;
                overflow_count += if !was_consumed { 1 } else { 0 };
            }
            println!("Total number of overflow: {:?}", overflow_count);
        })
    };

    let receiver = {
        let broker = broker.clone();
        let barrier = barrier.clone();
        thread::spawn(move || {
            //let sleep_duration = Duration::from_millis(1000);
            let mut active_buffer: Box<[u16]> = vec![0u16; 1000].into_boxed_slice();
            let mut underrun_count = 0;
            for i in 0..5 {
                // provoke race-condition with sender
                barrier.wait();
                let (buffer, was_produced) = broker.get_buffer(active_buffer);
                active_buffer = buffer;
                underrun_count += if !was_produced { 1 } else { 0 };

                // Use the buffer, which takes some time
                println!("Receiver {:?} {:?}", i, active_buffer[0]);
                //thread::sleep(sleep_duration);
            }
            println!("Total number of underruns: {:?}", underrun_count);
        })
    };

    sender.join().expect("failed to join sender");
    receiver.join().expect("failed to join receiver");

    println!("mutex errors: {:?}", broker.clone().get_error_count());
}

What I’m not entirely happy with:

  • Some methods return (T, bool) to indicate the role of the Result. It felt cumbersome to return Result<T, T>, but the tuple cannot simply be assigned on the callee’s end without introducing intermediate bindings (see Destructuring assignment)
  • I don’t like to have two synchronization mechanisms: The buffer’s buffer: Mutex<_> and ‘is_filled: AtomicBool’. A simple Cell<bool> would be enough for the latter, since the access to the flag get’s synchronized by the Mutex's lock. Would it be better to merge the buffer along with the flag into some kind of Mutex<Cell<(T, bool)>>?
  • Is Ordering::Relaxed enough safety? Or may the compiler choose to move the atomic access out of the lock’s lifetime?

Thanks for reading so far :blush:


What's everyone working on this week (46/2017)?
#2

@HadrienG wrote up some nice posts (and code) on triple buffering earlier this year: "SPMC buffer": triple buffering for multiple consumers


#4

Thanks, I came across his crate during my research and if I understood him right he focuses on Clone-types. But it seems like a good read nevertheless.


#5

Hi @fyl2xp1, I’m the author of triple-buffer, spmc-buffer, and the recap blog post quoted by @vitalyd . Let me start by reviewing your design constraints a little bit.

There shouldn’t by any heap allocations

Makes absolute sense for this data structure. Heap allocation should be kept off the hot performance path. I would however personally tolerate one heap allocation at initialization time, as long as there is no heap allocation while the structure is being used.

the data should be mutable on both ends

Sounds like a feature that I could add to triple-buffer, never thought that the consumer might want to write as well. (For spmc-buffer, it would prevent multiple consumers from sharing an output buffer, so I would be more hesitant to add it).

Not a single explicit lifetime

Should not be needed indeed, at least not on the user-friendly interface path.

Multiple senders and receivers are supported.

That design constraint seems highly questionable in the triple buffering model:

  • If there are multiple senders, it means that the data can and will often be overwritten without having been ever read, and that multiple streams of data (the N producers) race for control of the buffer’s content. Moreover, the producers need to synchronize with each other. That seems wasteful and hard to reason about, do you have a clear use case for it?
  • If there are multiple receivers, then you lose a desirable properties of triple buffering, which is that the producer can be guaranteed to never block or fail to submit an update. If like spmc-buffer you try to reduce this effect by allocating more buffers, keep in mind that It is also more difficult to track which buffer is being used and which can be re-used by the producer.

As I’ll elaborate later on, your specific implementation doesn’t really work well with multiple receivers in practice (only the first receiver will get the data from the producer, the other receivers will just find an empty buffer).

Senders and receivers own their current data

I think you may really want to reconsider this one. Owning the three buffers is what allows triple-buffer to be implemented without locks (because swapping buffers is just swapping array indices, which is atomic in hardware) and to only do one copy of the data when the producer moves it in (because the consumer can be handed over a reference instead of a value).

Stay tuned for a future post where I’ll have a look at your implementation!


#6

Let’s have a look at the interface first:

pub fn new(initial_buffer: T, is_filled: bool) -> Self

Why does the code which creates the buffer need to decide on the value of the is_filled flag? That seems to be exposing the implementation unnecessarily. Ideally you could instead decide on a good default for this and explain it in the library’s documentation.


/// double buffer approach: only set the buffer if the target is empty
/// returns a spare buffer on success and new_buffer on failure
pub fn fill_buffer(&self, new_buffer: T) -> (T, bool)

Here, I will disagree with your attempt to support both double buffering and triple buffering approaches with a single data structure, because there is a fundamental difference between both: in the double buffering case, the producer may need to block, whereas in the triple buffering case, it should never need to.

The simplest way to block the producer in Rust is to leave a Thread handle in the data structure and call thread::park(). On its side, upon fetching the data, the consumer will notice the presence of the handle, move it out, and call unpark() on it in order to wake up the producer. This means that you must somehow keep storage around for a Thread handle, which is not necessary in the triple buffering case. But most importantly, this is not what you do here.

Instead, your current interface leaves the producer with no obvious way to block on insertion failure. All the producer gets is a boolean which states whether the insertion succeeded or not. The obvious thing to do in case of a failure is then to spin in a loop, trying to send data to the consumer over and over again. Such spin loops are very dangerous because if the consumer is not available for a while, the producer will waste a lot of precious CPU cycles in this process. This is why blocking is generally a better default unless you have a really good reason not to.

Moreover, in your interface, the buffer sends a T back to the producer, no matter if the insertion succeeded or not. This could potentially worsen the situation: every time fill_buffer is called, the buffer needs to return a T to the producer, which for all you know may potentially be a very large object (in the realm of graphics, for example, a 8k 32bit image is 127 MB), and most of the time this big object will just be the one that the producer sent to the buffer as input! Unless the compiler manages to save your skin by eliding the copy, this could cause a lot of unnecessary and expensive memory traffic.

There is a more subtle issue here as well, and that’s the correctness of the producer’s algorithm. When the producer gets a T back, it does not really know what’s inside: it is just whatever the buffer used to contain, which in a multi-producer design is entirely unknown. So the safest thing to do is to wipe that T before use. If you are going to do this anyhow, the best thing to do is to just create a fresh T instead of consuming twice the memory bandwidth in order to extract one from the buffer AND reset it immediately afterwards.

This is why when designing triple-buffer, in spite of being theoretically able to provide the producer with a buffer to write in, I decided to only allow buffers to be moved in. Later on, if there is demand, I can also provide a “give me a stale buffer to write to” interface for some advanced performance optimization use cases, but I thought the “move data in and lose it forever” approach was by far the safest and easiest interface design, and was therefore the best default.

For all these reasons, I think a pub fn fill_buffer(&self, new_buffer: T) interface with no return value should be better and sufficient, unless a clear use case proves otherwise.


/// triple buffer approach: always set the new buffer to be the current one
pub fn force_buffer(&self, new_buffer: T) -> (T, bool)

The same return value considerations as above apply: I think returning a random T is generally speaking a bad idea (due to the potential large size of the type and the potentially garbage data inside). But I am also more confused about the need to return a bool in this case. Does the producer really need to know whether the buffer was previously filled or not? Can you envision a situation where it would influence its actions in a meaningful way, and do you generally think that this belongs to the default user interface rather than an expert-centric one?


/// gets the current buffer by swapping in a spare one
/// the spare one gets returned in case there isn't a current buffer present
pub fn get_buffer(&self, spare_buffer: T) -> (T, bool)

The consumer interface has the same issue as the producer one, but in reverse: it requires the consumer to send dummy data in, which if you are unlucky with compiler optimizations will result in high and unnecessary memory traffic.

Unconditionally returning a T even if the buffer was not updated is also potentially expensive. It means that the program will potentially need to shuffle a lot of data around, which if you are unlucky will just be ignored by the consumer because the boolean flag says that it should do so.

One possible way to avoid this would be to separate the action of checking whether the data has been updated, and fetching the data if that is the case:

pub fn check_buffer(&self) -> bool
pub fn get_buffer(&self, spare_buffer: T) -> T

But this interface only works with one consumer, since with multiple consumers you get a race condition: another consumer could have fetched the data between the time where you called check_buffer and the time where you called get_buffer.

And even with only one consumer, my earlier point about synchronization still stands: what should the consumer do when it figures out that no new data is available? Should it keep using the old data? Or should it spin until the consumer sends more data? In triple-buffer, I decided that sticking with the old data was better, but if you want to wait for more data, then you should avoid spinning and instead use the technique described above to make the consumer fall asleep and have the producer wake it up when more data is available.


pub fn get_error_count(&self) -> u32

This sounds like a debugging feature, since in a production application, I don’t think there is a clear use case for knowing the amount of lock failures. In fact, even in debug mode, I do not see the point of counting them.

A lock failure means that someone died while holding a mutex, leaving its contents in a garbled state, so it should usually be considered a fatal error and either be handled via a panic or reported to the client. Even if you really want to handle it silently, the proper way to do so would not to be to count the amount of failures, but to reset the contents of the mutex, which will require a synchronization protocol on its own…

If this is a debugging feature, it should be possible to turn it off in release builds in order to get rid of its performance impact, using something like Cargo features.


#7

Now, onto the implementation. First, let us look at the data structure.

buffer: Mutex<Cell<T>>,

The use of a mutex is highly unfortunate, since the point of a triple buffer is to be a nonblocking synchronization primitive. The reason why you need it here is that your design requires copying data in and out of a single shared memory cell, and since copying data of type T is not necessarily atomic in hardware, it requires serializing the readers and the writers so that they take turns.

Another less obvious property of this design is that it requires two memory copies per message passing transaction. One to put data inside of the Mutex, and another to take it out. The consumer cannot access the data in place, because it would need to hold the lock during the entire process, which would block the producer.

To eliminate this serialization, you need to have multiple shared memory cells, so that the consumer can read a version of the data while the producer is writing to another. The number of memory cells needed for lock-free operation depends on the amount of producers and consumers, here I will assume that you only have one producer and one consumer for simplicity.

With two shared memory cells, you get double buffering: simultaneous reading and writing is possible, but when either of the producer or the consumer is finished, it needs to wait for the other in order to be sure that the read and write buffers are swapped when both of them are idle. With three shared memory cells, you can achieve lock-free reading, writing and swapping, so everything becomes nonblocking, which is why triple buffering is so great to begin with.

Which goes back to my original point: having the producer and consumer own their buffer greatly hampers the efficiency of the synchronization protocol, which is why I would suggest that you reconsider this specific design choice.


/// true: buffer contains data ready to be retrieved by the receiver
/// false: buffer contains a placeholder ready to be filled by a sender
is_filled: AtomicBool,

There needs to be a flag which tells the reader that there is new data pending, but you can make the synchronization protocol more efficient by using some bit tricks in order to…

  • Tell, in a single relaxed atomic read, whether there is incoming data
  • If so, clear the flag and acquire access to the data in a single atomic swap with acquire ordering

The associated trickery is a bit involved, so I suggest you have a look at triple-buffer’s source code to see how I did it and come back to me if you have any questions. Basically, the key insight is that because you have few buffers, you can pack a buffer index and a dirty flag in a single atomic integer.


/// diagnostic value counting the number failed mutex locks
error_count: AtomicUsize,

As I said, I don’t think counting the amount of failed locks is the right approach. When you fail to lock, Rust is trying to tell you that a client has panicked while holding the lock and that the data behind the lock is potentially corrupt, which is Very Bad and definitive. All future lock acquisitions will return the same error if nothing is done.

The right thing to do in this case is either to bomb right away, propagate the error up to the client code to let it choose, or somehow reset the Mutex’s state (which can easily get tricky).


Now, onto method implementations:

pub fn new(initial_buffer: T, is_filled: bool) -> Self {
    TripleBuffer {
        buffer: Mutex::new(Cell::new(initial_buffer)),
        is_filled: AtomicBool::new(is_filled),
        error_count: AtomicUsize::new(0),
    }
}

This is fine.


/// double buffer approach: only set the buffer if the target is empty
/// returns a spare buffer on success and new_buffer on failure
pub fn fill_buffer(&self, new_buffer: T) -> (T, bool) {
    if let Ok(buffer) = self.buffer.lock() {
        let was_filled = self.is_filled.swap(true, Ordering::Relaxed);
        (if was_filled { new_buffer } else { buffer.replace(new_buffer) }, !was_filled)
    } else {
        self.error_count.fetch_add(1, Ordering::Relaxed);
        (new_buffer, false)
    }
}

/// triple buffer approach: always set the new buffer to be the current one
pub fn force_buffer(&self, new_buffer: T) -> (T, bool) {
    if let Ok(buffer) = self.buffer.lock() {
        let was_filled = self.is_filled.swap(true, Ordering::Relaxed);
        (buffer.replace(new_buffer), !was_filled)
    } else {
        self.error_count.fetch_add(1, Ordering::Relaxed);
        (new_buffer, false)
    }
}

/// gets the current buffer by swapping in a spare one
/// the spare one gets returned in case there isn't a current buffer present
pub fn get_buffer(&self, spare_buffer: T) -> (T, bool) {
    if let Ok(buffer) = self.buffer.lock() {
        let was_filled = self.is_filled.swap(false, Ordering::Relaxed);
        (if was_filled { buffer.replace(spare_buffer) } else { spare_buffer }, was_filled)
    } else {
        self.error_count.fetch_add(1, Ordering::Relaxed);
        (spare_buffer, false)
    }
}

This is almost as efficient as it can get with multiple producers and consumers, although as mentioned above, it doesn’t work very well with multiple consumers (only the first consumer will read the correct value, the next ones will find an “empty” buffer).

With a single producer/consumer pair (which you can assert at compile time by breaking the TripleBuffer data structure into a producer half and a consumer half and requiring &mut access to the halves), you can do better.

In this scenario, atomically setting is_filled inside of the lock duplicates synchronization and makes you hold the lock longer than necessary. You could improve on either of these aspects in the following ways:

  • Make is_filled a regular boolean, stored inside of the Mutex alongside the Cell
  • In the producer, set is_filled outside of the lock, with Release ordering to ensure that this is not reordered before the memory cell is filled. Start the consumer transaction by checking is_filled with Relaxed ordering and aborting immediately if it’s false.

As mentioned previously, leaving the clients with a “sorry, I failed” error counter is somewhat unsatisfactory from a user interface point of view. From an implementation point of view, it also makes your code more complicated to write, because you then need to refrain from telling the client that the buffer has been updated.


pub fn get_error_count(&self) -> u32 {
    self.error_count.load(Ordering::Relaxed) as u32
}

As mentioned previously, I really don’t think that this is the right approach. But it is almost the right way to implement it. Almost, because “as” casts silently truncate the data, so with enough lock errors you could overflow the counter without noticing it.


And that’s the end of my review. Hope this helped! By the way, if you ultimately want to contribute some ideas or code to triple-buffer and spmc-buffer, I’m totally open to that.

I just created a ticket for your idea of giving a way to write into the buffers in place in the bugtrackers of triple-buffer and spmc-buffer, which should hopefully give you an idea of which trade-offs are involved when an efficient synchronization protocol based on atomic integer swaps is used.

Finally, concerning your previous remark…

Thanks, I came across his crate during my research and if I understood him right he focuses on Clone-types. But it seems like a good read nevertheless.

Actually, I don’t rely on Clone that badly. Outside of testing code, it is only used when initializing the buffers, because at that point I need to initialize N buffers with one input value.

An alternative way to resolve this problem would be to require T to be Default instead. But that would not have been my first choice, because I don’t like to impose a default-constructibility requirement on my clients. It encourages them to add default-constructors that make no sense to their type (e.g. “null states”), which happens too often in C++ out of convenience.

In principle, Rust allows me to support both options, so if there is demand from people who are really bothered by the Clone requirement, I can add a version for default-constructible types as well.


#8

Mis-formulation on my side. The mutability was an outcome and not on purpose. If the consumer has to mangle the data somehow it might come in handy to prevent the usage of temp buffers.

Yup, that was rather by accident than on purpose :wink:. When used in triple buffered mode you still get monotonicity for each consumer - don’t have a scenario where this might come in handy, though. But it’s still useful when using in double buffered mode, where no input is lost. (to be correct: even in triple buffered mode nothing is lost per se, but I wouldn’d rely on the sender to re-send an unconsumed buffer from other senders)

My implementation swaps buffers atomically without copying them, as this would be ridiculously slow; or did I misunderstand Cell.swap()?


#9

Sorry, I did not understand this part, can you elaborate a bit?

I’m afraid you did.

Cell<T> is not about making swap cheap, but about allowing mutation from &self. On the inside, it is actually just an UnsafeCell<T>, which is a wrapper around a value of type T that allows you to bypass Rust’s normal mutability rules. All it does is to allow for reads and writes from that value from someone who only has a &self.

If you want cheap swaps, you need to swap pointers, which means that in order to retain sanity in the face of lifetime hell, you will want to swap either Boxes or array indices. Personally, I prefer to do without heap allocation when I can get away with it, as it’s terribly easy to kill performance by relying too much of it, which is why I went for array indices in triple_buffer.

But if usage of Box were not a concern, it would be an equally valid solution. And in fact, nothing prevents you from building a TripleBuffer<Box<T>> in order to make the moves cheaper. I just didn’t want to do it behind the user’s back.


#10

I really don’t know where to start on your feed back. So I’ll do it in random order :wink:

I think I know exactly what a triple buffer normally looks like, how it normally works and what its features and guarantees are. But the internal representation doesn’t matter in practice but how it works from the outside view. My implementation is explicitly not the classical approach which your implementation uses.

In my opinion a Cell<Box> is exactly what you claim I wouldn’t use: A pointer to a huge object.

Could you please tell me where exactly you think I’m copying the data? I just tried my code with a 4 GiB-Array and it still took < 5 ms to run.

Maybe more from me later on that, but the code works perfectly fine and seems performant as hell.


#11

e.g. performing a gamma correction in-place before displaying the image.

That’s one reason my getter provides a flag. If the second run re-returns the same image (underrun), the receiver knows the post-processing had already been applied.


#12

I understand well that you tried to do things differently, and that’s something which I appreciate! It’s important to not take the textbook designs as the end of the story and experiment around them. Without that, things like lock-free programming would never be where they are today.

The point of my review was only to describe what I like and dislike about the approach which you have settled for so far, and more importantly, why. I tried to review things both from the point of view of someone trying to use the library correctly, and someone working on it and trying to make it as optimal as possible (which indirectly influences the design, as the buffer ownership discussion highlights).

You said yourself in your first post that there should be no dynamic memory allocation, which is why I thought that this is what you were going for. And indeed, your current TripleBuffer design does not allocate. It does copy values of type T a lot, though, so one has to hope that the type is cheap to copy.

Note that in this context, a Mutex<Cell<T>> is really superfluous, as the exterior mutex already provides internal mutability. You could just use a Mutex<T> and mem::replace the contents.

Every time you are calling cell.swap(), the naive implementation of that is a copy of the original cell contents into a big temporary, a copy of the input into the cell, and a copy of the temporary into the output.

Sometimes the compiler can optimize part of this process out. For example, if it can prove that the client of swap() will never use the result, a basic optimizing compiler will optimize out the process as a mere copy from the input to the cell, and a smart optimizing compiler will realize that the modified Cell value is never used either and optimize that out as well.

Are you sure that you tried it out with an array (e.g. [u8; 4*1024*1024*1024*1024]) and not something like a Vec (which is internally a pointer to heap-allocated data and thus cheap to move)? I would expect something which passes a 4 GB array to a function by value to crash from stack overflow shortly after startup, unless the compiler manages to optimize it out entirely.

I see. To add this feature to triple_buffer, I would need to build an expert method which looks like this…

/// Get write access the latest read buffer, and tell whether it was modified
/// since the last time read() or read_raw() was called.
fn read_raw(&mut self) -> (&mut T, bool);

…and then I think we would agree that I would fulfill the proposed use case without any need for a Mutex.


#13

I tried exactly the code posted above (just with a size of 4_000_000_000). And as this data resides in a Box it’s pretty surely located on the heap. allocation-free relates to the buffer-switching algorithm and its interface and not to the one-time initialization or the whole program.

I think a Box is pretty cheap?


#14

Ah, so you’re boxing the array. Got it. The code that you posted at the beginning of this topic did not feature any Box in the TripleBuffer struct itself, and I did not look at the main() closely enough, sorry.


#15

Thanks! This is exactly the kind of optimization I was originally looking for.


#16

This is not an optimization, just code cleanup, as the generated machine code will be exactly the same.


#17

I re-visited my code and achieved to get rid of the AtomicBool and the Cell :smirk:

I also added some documentation for not having to consult the usage example and renamed that whole thing into NonblockingBuffer. I merged both send strategies into a single method.

It’s still multi-producer and multi-consumer but not all combinations will make sense.

use std::thread;
use std::sync::{ Mutex, Arc, Barrier };
use std::sync::atomic::{ AtomicUsize, Ordering };
//use std::time::Duration;
use std::mem;

struct NonblockingBuffer<T> {
    /// In graphics terms this is called the 'back buffer'
    /// The attached flag determines if the contents are valid. As the buffer is only one entry deep this
    /// is equivalent to a 'buffer_full'-flag.
    /// - true = filled by a producer; ready to be read from a consumer; a write operation would overflow
    /// - false = undefined contents; ready to be filled by a producer; a read operation would underrun
    back_entry: Mutex<(T, bool)>,
    /// diagnostic value counting the number failed mutex locks; could be used by a monitor
    error_count: AtomicUsize,
}

#[allow(dead_code)]
impl<T> NonblockingBuffer<T> {

    /// Creates a new buffer
    /// initial_entry contains the initial back_entry
    /// is_valid determines if the inititial_entry is meant to be passed to the consumer on its first try
    /// to get the back_buffer
    pub fn new(initial_entry: T, is_valid: bool) -> Self {
        NonblockingBuffer {
            back_entry: Mutex::new((initial_entry, is_valid)),
            error_count: AtomicUsize::new(0),
        }
    }

    /// Swaps in a new back_entry to be retrieved by a consumer.
    ///
    /// On success the method returns an entry of undefined content, which gets typically refilled
    /// by the sender for the next iteration. This comes in handy if the underlying structure is
    /// expensive to re-allocate. The returned flag is set to true to indicate a successfull swap.
    /// 
    /// If the buffer was already filled/valid a so-called overflow occurs and the returned flag will
    /// be false. Depending on the force-switch there are two strategies regarding the buffer:
    /// - force=false: the existing back_entry remains and the method returns the passed entry itself;
    ///                The caller might choose to resend the entry at a later time
    /// - force=true : the existing back_entry will always be replaced by the passed one and the
    ///                old valid back_entry is returned. There's an implicit guarantee that this buffer
    ///                has never been retrieved by a consumer since the last send().
    ///                The caller will typically overwrite its content and re-send it.
    pub fn send(&self, mut entry: T, force: bool) -> (T, bool) {
        if let Ok(mut guard) = self.back_entry.lock() {
            let (ref mut back_entry, ref mut is_valid) = *guard;
            let was_valid: bool = *is_valid;

            if force || !*is_valid {
                mem::swap(back_entry, &mut entry);
                *is_valid = true;
            }
            (entry, !was_valid)
        } else {
            self.error_count.fetch_add(1, Ordering::Relaxed);
            (entry, false)
        }
    }

    /// Tries to retrieve the current back_entry by swapping in a spare one.
    ///
    /// If the current back_entry is valid it is returned along with a flag set to true, indicating
    /// success. The back_entry will be replaced by the given spare entry and marked as empty/invalid.
    ///
    /// If there's no valid back_entry present a so-called buffer underrun occurs. The entry passed
    /// to this method will be returned back along with a flag set to false indicating the underrun
    /// condition. As the caller will typically pass the most recent successfully retrieved entry, it
    /// can re-used it in these cases.
    pub fn recv(&self, mut entry: T) -> (T, bool) {
        if let Ok(mut guard) = self.back_entry.lock() {
            let (ref mut back_entry, ref mut is_valid) = *guard;
            let was_valid = *is_valid;

            if *is_valid {
                mem::swap(back_entry, &mut entry);
                *is_valid = false;
            }
            (entry, was_valid)
        } else {
            self.error_count.fetch_add(1, Ordering::Relaxed);
            (entry, false)
        }
    }

    pub fn get_error_count(&self) -> u32 {
        self.error_count.load(Ordering::Relaxed) as u32
    }

}

fn main() {
    let buffer_size = 1_000_000_000;
    let test_count = 10;

    let initial_entry: Box<[u16]> = vec![0u16; buffer_size].into_boxed_slice();
    let broker = Arc::new(NonblockingBuffer::new(initial_entry, false));
    let barrier = Arc::new(Barrier::new(2));

    let sender = {
        let broker = broker.clone();
        let barrier = barrier.clone();
        thread::spawn(move || {
            //let sleep_duration = Duration::from_millis(10000);
            let mut back_entry: Box<[u16]> = vec![0u16; buffer_size].into_boxed_slice();
            let mut overflow_count = 0;
            for i in 0..test_count {
                // Modify the buffer, which takes some time
                let some_send_value = test_count - i;
                back_entry[buffer_size - 1] = some_send_value;
                //thread::sleep(sleep_duration);

                // provoke race-condition with receiver
                barrier.wait();
                // send next buffer
                let (entry, was_consumed) = broker.send(back_entry, true);
                println!("send #{} → {} {}", i, some_send_value, if was_consumed { "" } else { "overflow" } );
                back_entry = entry;
                overflow_count += if !was_consumed { 1 } else { 0 };
            }
            println!("Total number of overflow: {:?}", overflow_count);
        })
    };

    let receiver = {
        let broker = broker.clone();
        let barrier = barrier.clone();
        thread::spawn(move || {
            //let sleep_duration = Duration::from_millis(1000);
            let mut front_entry: Box<[u16]> = vec![0u16; buffer_size].into_boxed_slice();
            let mut underrun_count = 0;
            for i in 0..test_count {
                // provoke race-condition with sender
                barrier.wait();
                // fetch next buffer
                let (entry, was_produced) = broker.recv(front_entry);
                front_entry = entry;
                underrun_count += if !was_produced { 1 } else { 0 };

                // Use the entry, which takes some time
                let some_received_value = front_entry[buffer_size - 1];
                println!("recv #{} ← {} {}", i, some_received_value, if was_produced { "" } else { "underrun" } );
                //thread::sleep(sleep_duration);
            }
            println!("Total number of underruns: {:?}", underrun_count);
        })
    };

    sender.join().expect("failed to join sender");
    receiver.join().expect("failed to join receiver");

    println!("mutex errors: {:?}", broker.clone().get_error_count());
}

This gives us a nice printout:

send #0 → 10 
recv #0 ← 10 
recv #1 ← 10 underrun
send #1 → 9 
send #2 → 8 overflow
recv #2 ← 8 
recv #3 ← 8 underrun
send #3 → 7 
send #4 → 6 overflow
recv #4 ← 6 
recv #5 ← 6 underrun
send #5 → 5 
send #6 → 4 overflow
recv #6 ← 4 
recv #7 ← 4 underrun
send #7 → 3 
send #8 → 2 overflow
recv #8 ← 2 
recv #9 ← 2 underrun
Total number of underruns: 5
send #9 → 1 
Total number of overflow: 4
mutex errors: 0

I’m still unhappy with:

  • the result-tuple cannot simply be assigned without introducing intermediate bindings (see first post for details and link)
  • Can this code be simplified somehow? I’m failing to integrate the tuple into the if let-pattern:
if let Ok(mut guard) = self.back_entry.lock() {
    let (ref mut back_entry, ref mut is_valid) = *guard;

#18

The new documentation and reworked interface look quite a bit nicer!

But calling this a NonblockingBuffer is really a bit of a misnomer, as if a thread comes by while another has locked the mutex, it will most definitely block for an unbounded period of time. For the synchronization protocol to be nonblocking, one prerequisite is to get rid of the Mutex lock.

The reason why you cannot integrate the destructuring of the mutex contents into the if let binding which acquires the lock is that what you get then is not a reference to the contents of the mutex, but a more complex opaque type (the MutexGuard) with an overriden dereference operator. This is not something which bindings can destructure, unlike regular references.

Taking into account your good point that T may be expensive to create, I have started integrating in-place buffer modification into triple_buffer. With it, people will eventually be able to get the best of both worlds: a simple and ergonomic interface for less demanding needs, an optional “expert” interface along the lines of yours when low-level control is needed, and a lock-free implementation that guarantees predictable and maximal performance under any circumstance.

Here’s what it currently looks like:

// === PRODUCER INTERFACE ===

    /// Write a new value into the triple buffer, tell whether an unread value
    /// was overwritten because the consumer did not read it quickly enough.
    pub fn write(&mut self, value: T) -> bool;

    /// Check if the consumer has read our last submission yet
    ///
    /// This method is only intended for diagnostics purposes. Please do not let
    /// it inform your decision of sending or not sending a value, as that would
    /// effectively be building a very poor spinlock-based double buffer
    /// implementation. If what you truly need is a double buffer, build
    /// yourself a proper blocking one instead of wasting CPU time.
    ///
    pub fn consumed(&self) -> bool;

    /// Get raw access to the write buffer
    ///
    /// This advanced interface allows you to update the write buffer in place,
    /// which can in some case improve performance by avoiding to create values
    /// of type T repeatedy when this is an expensive process.
    ///
    /// However, by opting into it, you force yourself to take into account
    /// subtle implementation details which you could normally ignore.
    ///
    /// First, the buffer does not contain the last value that you sent (which
    /// is now into the hands of the reader). In fact, the consumer is allowed
    /// to write complete garbage into it if it feels so inclined. All you can
    /// safely assume is that it contains a valid value of type T.
    ///
    /// Second, we do not send updates automatically. You need to call
    /// raw_publish() in order to propagate a buffer update to the consumer.
    /// Alternative designs based on Drop were considered, but ultimately deemed
    /// too magical for the target audience of this method.
    ///
    #[cfg(raw)]
    pub fn raw_write_buffer(&mut self) -> &mut T;

    /// Unconditionally publish an update, checking for overwrites
    ///
    /// After updating the write buffer using raw_write_buffer(), you can use
    /// this method to publish your updates to the consumer. Like write(), this
    /// method will tell you whether an unread value was overwritten.
    ///
    #[cfg(raw)]
    pub fn raw_publish(&mut self) -> bool;


// === CONSUMER INTERFACE ===

    /// Access the latest value from the triple buffer
    pub fn read(&mut self) -> &T;

    /// Tell whether a read buffer update is incoming from the producer
    ///
    /// This method is only intended for diagnostics purposes. Please do not let
    /// it inform your decision of reading a value or not, as that would
    /// effectively be building a very poor spinlock-based double buffer
    /// implementation. If what you truly need is a double buffer, build
    /// yourself a proper blocking one instead of wasting CPU time.
    ///
    pub fn updated(&self) -> bool;

    /// Get raw access to the read buffer
    ///
    /// This advanced interface allows you to modify the contents of the
    /// read buffer, which can in some case improve performance by avoiding to
    /// create values of type T when this is an expensive process. One possible
    /// application, for example, is to post-process values from the producer.
    ///
    /// However, by opting into it, you force yourself to take into account
    /// subtle implementation details which you could normally ignore.
    ///
    /// First, keep in mind that you can lose access to the current read buffer
    /// any time read() or raw_update() is called, as it can be replaced by an
    /// updated buffer from the producer automatically.
    ///
    /// Second, to reduce the potential for the aforementioned usage error, this
    /// method does not update the read buffer automatically. You need to call
    /// raw_update() in order to fetch buffer updates from the producer.
    ///
    #[cfg(raw)]
    pub fn raw_read_buffer(&mut self) -> &mut T;

    /// Update the read buffer
    ///
    /// Check for incoming updates from the producer, and if so, update our read
    /// buffer to the latest data version. This operation will overwrite any
    /// changes which you have commited into the read buffer.
    ///
    /// Return a flag telling whether an update was carried out
    ///
    #[cfg(raw)]
    pub fn raw_update(&mut self) -> bool;

The reason why the raw interface is an opt-in feature, rather than available as a default, is that there are performance implications to letting the consumer write into its read buffer, as it means that the producer needs to synchronize with consumer updates on every buffer swap. Therefore, this feature should only be enabled when people need it.

EDIT: I’ve also added support for non-Cloneable types along the way, since that is another requirement which you found unpleasant and I don’t really need it.


#19

This was the part I missed, thanks!

I’m happy with my 8_000_000 swaps per second as this mechanism is only for internal needs, but you’re right, it’s not non-blocking.