A triple buffer is a well-known concept in computer graphics:
- One thread produces the images to be displayed and sends them one after another to the buffer mechanism.
- Another thread fetches the image one after another from the buffer mechanism and brings them onto screen
- The buffer is typically only one entry deep to minimize latency
As simple as: producer --> buffer --> consumer
Normally the producer and consumer don't have the same rate:
- If the producer is too slow, the buffer underruns. A common strategy is to re-use the recent image (or to order a new graphics-accelerator ).
- If the producer is too fast, the buffer overflows. There are two common stategies to solve this (simplified):
- Double buffering: The producer blocks until the buffer is freed
- Triple buffering: The producer overwrites the most recent version in the buffer
I tried to implement this triple buffer approach to asynchronously send big chunks of data from one thread to another. There shouldn't by any heap allocations and the data should be mutable on both ends. This was achieved by always swapping buffers into/out of the buffer algorithm.
To my surprise this went really smooth and the result was way better than I had expected:
- Not a single explicit lifetime
- Multiple senders and receivers are supported
- Senders and receivers own their current data
This is my first real attempt to write something concurrent in Rust and I'd like to know if this can be improved somehow.
use std::thread;
use std::sync::{ Mutex, Arc, Barrier };
use std::sync::atomic::{ AtomicUsize, AtomicBool, Ordering };
use std::cell::Cell;
//use std::time::Duration;
struct TripleBuffer<T> {
buffer: Mutex<Cell<T>>,
/// true: buffer contains data ready to be retrieved by the receiver
/// false: buffer contains a placeholder ready to be filled by a sender
is_filled: AtomicBool,
/// diagnostic value counting the number failed mutex locks
error_count: AtomicUsize,
}
#[allow(dead_code)]
impl<T> TripleBuffer<T> {
pub fn new(initial_buffer: T, is_filled: bool) -> Self {
TripleBuffer {
buffer: Mutex::new(Cell::new(initial_buffer)),
is_filled: AtomicBool::new(is_filled),
error_count: AtomicUsize::new(0),
}
}
/// double buffer approach: only set the buffer if the target is empty
/// returns a spare buffer on success and new_buffer on failure
pub fn fill_buffer(&self, new_buffer: T) -> (T, bool) {
if let Ok(buffer) = self.buffer.lock() {
let was_filled = self.is_filled.swap(true, Ordering::Relaxed);
(if was_filled { new_buffer } else { buffer.replace(new_buffer) }, !was_filled)
} else {
self.error_count.fetch_add(1, Ordering::Relaxed);
(new_buffer, false)
}
}
/// triple buffer approach: always set the new buffer to be the current one
pub fn force_buffer(&self, new_buffer: T) -> (T, bool) {
if let Ok(buffer) = self.buffer.lock() {
let was_filled = self.is_filled.swap(true, Ordering::Relaxed);
(buffer.replace(new_buffer), !was_filled)
} else {
self.error_count.fetch_add(1, Ordering::Relaxed);
(new_buffer, false)
}
}
/// gets the current buffer by swapping in a spare one
/// the spare one gets returned in case there isn't a current buffer present
pub fn get_buffer(&self, spare_buffer: T) -> (T, bool) {
if let Ok(buffer) = self.buffer.lock() {
let was_filled = self.is_filled.swap(false, Ordering::Relaxed);
(if was_filled { buffer.replace(spare_buffer) } else { spare_buffer }, was_filled)
} else {
self.error_count.fetch_add(1, Ordering::Relaxed);
(spare_buffer, false)
}
}
pub fn get_error_count(&self) -> u32 {
self.error_count.load(Ordering::Relaxed) as u32
}
}
fn main() {
let initial_buffer: Box<[u16]> = vec![0u16; 1000].into_boxed_slice();
let broker = Arc::new(TripleBuffer::new(initial_buffer, false));
let barrier = Arc::new(Barrier::new(2));
let sender = {
let broker = broker.clone();
let barrier = barrier.clone();
thread::spawn(move || {
//let sleep_duration = Duration::from_millis(1000);
let mut active_buffer: Box<[u16]> = vec![0u16; 1000].into_boxed_slice();
let mut overflow_count = 0;
for i in 0..5 {
// Modify the buffer, which takes some time
active_buffer[0] = i;
println!("Sender {:?} {:?}", i, active_buffer[0]);
//thread::sleep(sleep_duration);
// provoke race-condition with receiver
barrier.wait();
let (buffer, was_consumed) = broker.force_buffer(active_buffer);
active_buffer = buffer;
overflow_count += if !was_consumed { 1 } else { 0 };
}
println!("Total number of overflow: {:?}", overflow_count);
})
};
let receiver = {
let broker = broker.clone();
let barrier = barrier.clone();
thread::spawn(move || {
//let sleep_duration = Duration::from_millis(1000);
let mut active_buffer: Box<[u16]> = vec![0u16; 1000].into_boxed_slice();
let mut underrun_count = 0;
for i in 0..5 {
// provoke race-condition with sender
barrier.wait();
let (buffer, was_produced) = broker.get_buffer(active_buffer);
active_buffer = buffer;
underrun_count += if !was_produced { 1 } else { 0 };
// Use the buffer, which takes some time
println!("Receiver {:?} {:?}", i, active_buffer[0]);
//thread::sleep(sleep_duration);
}
println!("Total number of underruns: {:?}", underrun_count);
})
};
sender.join().expect("failed to join sender");
receiver.join().expect("failed to join receiver");
println!("mutex errors: {:?}", broker.clone().get_error_count());
}
What I'm not entirely happy with:
- Some methods return
(T, bool)
to indicate the role of the Result. It felt cumbersome to returnResult<T, T>
, but the tuple cannot simply be assigned on the callee's end without introducing intermediate bindings (see Destructuring assignment) - I don't like to have two synchronization mechanisms: The buffer's
buffer: Mutex<_>
and 'is_filled: AtomicBool'. A simpleCell<bool>
would be enough for the latter, since the access to the flag get's synchronized by theMutex
's lock. Would it be better to merge the buffer along with the flag into some kind ofMutex<Cell<(T, bool)>>
? - Is
Ordering::Relaxed
enough safety? Or may the compiler choose to move the atomic access out of the lock's lifetime?
Thanks for reading so far