Based on the discussion in https://users.rust-lang.org/t/unsafely-share-memory-between-threads-single-sender-multiple-receivers/ I developed a RingBuffer to transfer data between threads in a lock-free manner. It shall be used in an environment where performance is crucial so I tried to avoid any locks and boundary checks as far as possible.
There will be one producer/writer which owns the buffer for read and write access. It hands out readers that can be used by consumer threads to access the buffer. Writing and reading will happen simultaneously.
Each reader and writer will access the shared buffer chunk-wise. Each chunk will be an array to avoid boundary checks and improve compiler optimizations.
Under the assumption that my (external) scheduler prohibits the simultaneous access of a reader and writer to the same chunks: will this code behave in a defined manner?
use std::cell::UnsafeCell;
use std::sync::Arc;
const CHUNK_SIZE: usize = 1 << 4;
const BUFFER_SIZE: usize = 1 << 7;
const BUFFER_SIZE_MASK: usize = BUFFER_SIZE - 1;
type T = f32;
type Chunk = [T; CHUNK_SIZE];
#[repr(transparent)]
struct UnsafeSync<T>(T);
unsafe impl<T> Sync for UnsafeSync<T> {}
macro_rules! array {
(@accum (0, $($_es:expr),*) -> ($($body:tt)*)) => {array!(@as_expr [$($body)*])};
(@accum (1, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (0, $($es),*) -> ($($body)* $($es,)*))};
(@accum (2, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (0, $($es),*) -> ($($body)* $($es,)* $($es,)*))};
(@accum (4, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (2, $($es,)* $($es),*) -> ($($body)*))};
(@accum (8, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (4, $($es,)* $($es),*) -> ($($body)*))};
(@accum (16, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (8, $($es,)* $($es),*) -> ($($body)*))};
(@accum (32, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (16, $($es,)* $($es),*) -> ($($body)*))};
(@accum (64, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (32, $($es,)* $($es),*) -> ($($body)*))};
(@accum (128, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (64, $($es,)* $($es),*) -> ($($body)*))};
(@accum (256, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (128, $($es,)* $($es),*) -> ($($body)*))};
(@as_expr $e:expr) => {$e};
[$e:expr; $n:tt] => { array!(@accum ($n, $e) -> ()) };
}
struct RingBuffer {
buffer: Arc<UnsafeSync<[UnsafeCell<T>; BUFFER_SIZE]>>,
pub write_index: usize,
}
struct ChunkReader {
buffer: Arc<UnsafeSync<[UnsafeCell<T>; BUFFER_SIZE]>>,
pub read_index: usize,
}
impl RingBuffer {
fn new() -> Self {
Self {
buffer: Arc::new(UnsafeSync(array![UnsafeCell::new(T::default()); 128])),
write_index: 0,
}
}
/// Creates an temporary mutable array-view into the chunk at the current position
#[inline]
unsafe fn next_chunk_mut(&mut self) -> &mut Chunk {
let chunk_ptr = self.buffer.0[self.write_index & BUFFER_SIZE_MASK..].as_ptr();
self.write_index += CHUNK_SIZE;
&mut *(chunk_ptr as *mut Chunk)
}
// hand out a long-living reader for the internal buffer
// this reader may survive the writer
fn chunk_reader(&self) -> ChunkReader {
ChunkReader {
buffer: self.buffer.clone(),
read_index: 0,
}
}
}
impl ChunkReader {
/// Creates an temporary array-view into the chunk at the current position
#[inline]
unsafe fn next_chunk(&mut self) -> &Chunk {
let chunk_ptr = self.buffer.0[self.read_index & BUFFER_SIZE_MASK..].as_ptr();
self.read_index += CHUNK_SIZE;
&*(chunk_ptr as *const Chunk)
}
}
fn main() {
let mut buffer = RingBuffer::new();
let mut reader = buffer.chunk_reader();
println!("read empty buffer");
// iterate over whole buffer, chunk by chunk
(0..BUFFER_SIZE / CHUNK_SIZE).for_each(|_| println!("{}: {:?}", (&reader).read_index.clone(), unsafe { reader.next_chunk() }));
println!("modify buffer");
// overwrite first chunk
unsafe { buffer.next_chunk_mut() }.iter_mut().for_each(|i| *i = 123.0);
// overwrite second chunk
unsafe { buffer.next_chunk_mut() }.iter_mut().for_each(|i| *i = -123.0);
println!("read modified buffer");
// iterate over whole buffer, chunk by chunk
(0..BUFFER_SIZE / CHUNK_SIZE).for_each(|_| println!("{}: {:?}", (&reader).read_index.clone(), unsafe { reader.next_chunk() }));
}