Code review: array-based shared memory queue

Based on the discussion in https://users.rust-lang.org/t/unsafely-share-memory-between-threads-single-sender-multiple-receivers/ I developed a RingBuffer to transfer data between threads in a lock-free manner. It shall be used in an environment where performance is crucial so I tried to avoid any locks and boundary checks as far as possible.

There will be one producer/writer which owns the buffer for read and write access. It hands out readers that can be used by consumer threads to access the buffer. Writing and reading will happen simultaneously.

Each reader and writer will access the shared buffer chunk-wise. Each chunk will be an array to avoid boundary checks and improve compiler optimizations.

Under the assumption that my (external) scheduler prohibits the simultaneous access of a reader and writer to the same chunks: will this code behave in a defined manner?

use std::cell::UnsafeCell;
use std::sync::Arc;

const CHUNK_SIZE: usize = 1 << 4;
const BUFFER_SIZE: usize = 1 << 7;
const BUFFER_SIZE_MASK: usize = BUFFER_SIZE - 1;

type T = f32;
type Chunk = [T; CHUNK_SIZE];

#[repr(transparent)]
struct UnsafeSync<T>(T);
unsafe impl<T> Sync for UnsafeSync<T> {}

macro_rules! array {
    (@accum (0, $($_es:expr),*) -> ($($body:tt)*)) => {array!(@as_expr [$($body)*])};
    (@accum (1, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (0, $($es),*) -> ($($body)* $($es,)*))};
    (@accum (2, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (0, $($es),*) -> ($($body)* $($es,)* $($es,)*))};
    (@accum (4, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (2, $($es,)* $($es),*) -> ($($body)*))};
    (@accum (8, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (4, $($es,)* $($es),*) -> ($($body)*))};
    (@accum (16, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (8, $($es,)* $($es),*) -> ($($body)*))};
    (@accum (32, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (16, $($es,)* $($es),*) -> ($($body)*))};
    (@accum (64, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (32, $($es,)* $($es),*) -> ($($body)*))};
    (@accum (128, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (64, $($es,)* $($es),*) -> ($($body)*))};
    (@accum (256, $($es:expr),*) -> ($($body:tt)*)) => {array!(@accum (128, $($es,)* $($es),*) -> ($($body)*))};

    (@as_expr $e:expr) => {$e};

    [$e:expr; $n:tt] => { array!(@accum ($n, $e) -> ()) };
}

struct RingBuffer {
    buffer: Arc<UnsafeSync<[UnsafeCell<T>; BUFFER_SIZE]>>,
    pub write_index: usize,
}

struct ChunkReader {
    buffer: Arc<UnsafeSync<[UnsafeCell<T>; BUFFER_SIZE]>>,
    pub read_index: usize,
}

impl RingBuffer {
    fn new() -> Self {
        Self {
            buffer: Arc::new(UnsafeSync(array![UnsafeCell::new(T::default()); 128])),
            write_index: 0,
        }
    }
    
    /// Creates an temporary mutable array-view into the chunk at the current position
    #[inline]
    unsafe fn next_chunk_mut(&mut self) -> &mut Chunk {
        let chunk_ptr = self.buffer.0[self.write_index & BUFFER_SIZE_MASK..].as_ptr();
        self.write_index += CHUNK_SIZE;
        &mut *(chunk_ptr as *mut Chunk)
    }
    
    // hand out a long-living reader for the internal buffer
    // this reader may survive the writer
    fn chunk_reader(&self) -> ChunkReader {
        ChunkReader {
            buffer: self.buffer.clone(),
            read_index: 0,
        }
    }
}

impl ChunkReader {
    /// Creates an temporary array-view into the chunk at the current position
    #[inline]
    unsafe fn next_chunk(&mut self) -> &Chunk {
        let chunk_ptr = self.buffer.0[self.read_index & BUFFER_SIZE_MASK..].as_ptr();
        self.read_index += CHUNK_SIZE;
        &*(chunk_ptr as *const Chunk)
    }
}

fn main() {
    let mut buffer = RingBuffer::new();
    let mut reader = buffer.chunk_reader();
    
    println!("read empty buffer");
    // iterate over whole buffer, chunk by chunk
    (0..BUFFER_SIZE / CHUNK_SIZE).for_each(|_| println!("{}: {:?}", (&reader).read_index.clone(), unsafe { reader.next_chunk() }));
    
    println!("modify buffer");
    // overwrite first chunk
    unsafe { buffer.next_chunk_mut() }.iter_mut().for_each(|i| *i = 123.0);
    // overwrite second chunk
    unsafe { buffer.next_chunk_mut() }.iter_mut().for_each(|i| *i = -123.0);

    println!("read modified buffer");
    // iterate over whole buffer, chunk by chunk
    (0..BUFFER_SIZE / CHUNK_SIZE).for_each(|_| println!("{}: {:?}", (&reader).read_index.clone(), unsafe { reader.next_chunk() }));
}

What prevents the reader from overrunning the writer and cycling a second time through the ring buffer? I would use an atomic add to increment the write_index and have the reader go into a non-blocking delay loop when the read_index matches the write_index. If a usize index requires more storage than a single word in the target archtecture, comparing the read_index to the write_index probably needs an atomic load of the write_index.

There will be a scheduler that pushes work items into a pool which only contains items that are safe to execute. Executing one of those items might produce further items that will be valid. The worker threads will fetch items from this pool. There is no 1:1 relation between readers/writers and threads.

But what happens when a reader gets ahead of the writer and is picking up items from the prior cycle of items in the pool? The ring-buffer items do not contain write indices, so with the current design there's no way for the reader to detect that it has retrieved an entry that it retrieved during its last pass over the ring buffer. If the reader remains ahead of the writer, that read thread will cycle through the entire ring buffer a second time, retrieving entries, before it gets to a point where it is again retrieving entries for the first time. If it does not remain ahead of the writer, then when the writer again passes it there will be a series of entries in the ring buffer, behind the then-current write_index, that the reading thread never retrieves.

Your code (posted block) is unsafe. It is racy when used multithreaded.
Take the writing of 123.0 into a thread, the read of that chunk could produce random results of mixed 0 or 123, there is nothing synchronizing (release/acquire.) There is nothing blocking the use of next_chunk.

You don't demonstrate anything to confine the code so use is unrestricted and remains unsafe.

The scheduling of the access is completely out of scope for my question! Please just take this for granted!

I just want to know if the buffer would work as expected.

Given your code, we know that write_index is always a multiple of CHUNK_SIZE.
So, if CHUNK_SIZE divides BUFFER_SIZE, as in your example, this part of the code is fine (forall a, b > 0 where a divides b, forall n >= 0, (n * a) % b <= b - a), but if these settings were ever to be changed, thus potentially breaking this divisibility invariant, your code would overflow the buffer (more precisely, lend a mutable reference to extraneaous memory).

This is the kind of invariant that requires documentation both at the constant definition and at the pointer transmutation.

If we "forget about data races", this looks "fine" to me. However,
one does not simply "forget about data races"

What I mean by that is that in Rust, unless you declare a function unsafe, your function must be able to avoid any kind of UB when fed any kind of non-unsafe-ly formed input, whatever the order / sequence of the calls be.

This is why people will tell you that the given code is not sound, and rightfully so.

Whenever the safety of some function is only guaranteed by the fact that the only code calling it does respect some additional constraint (such as scheduling accesses), then that function must be marked unsafe.

Conclusion

So in your case, all you need to do is change:

impl RingBuffer {
    /// Creates an temporary mutable array-view into the chunk at the current position
    #[inline]
    fn next_chunk_mut(&mut self) -> &mut Chunk {
        let circular_index = self.write_index & BUFFER_SIZE_MASK;
        let chunk_ptr = self.buffer[circular_index..circular_index].as_ptr();
        self.write_index += CHUNK_SIZE;
        unsafe { &mut *(chunk_ptr as *mut Chunk) }
    }

and

impl ChunkReader {
    /// Creates an temporary array-view into the chunk at the current position
    #[inline]
    fn next_chunk(&mut self) -> &Chunk {
        let circular_index = self.read_index & BUFFER_SIZE_MASK;
        let chunk_ptr = self.buffer[circular_index..circular_index].as_ptr();
        self.read_index += CHUNK_SIZE;
        unsafe { &*(chunk_ptr as *const Chunk) }
    }
}

Into

impl RingBuffer {
    /// Creates an temporary mutable array-view into the chunk at the current position
    ///
    /// # Safety
    ///
    /// Accesses with corresponding [`ChunkReader`]s must be properly scheduled / synchronised
    #[inline]
    unsafe fn next_chunk_mut (&'_ mut self) -> &'_ mut Chunk
    {
        let write_index = self.write_index;
        let chunk_ptr: *const UnsafeCell<T> =
            self.buffer[write_index ..][.. CHUNK_SIZE].as_ptr()
        ;
        // same as & BUFFER_SIZE_MASK, but becoming slower instead of UB if CHUNK_SIZE were not to be a power of two.
        self.write_index = (write_index + CHUNK_SIZE) % CHUNK_SIZE;
        &mut *(chunk_ptr as *mut [T; CHUNK_SIZE]) 
    }

and

impl ChunkReader {
    /// Creates an temporary array-view into the chunk at the current position
    ///
    /// # Safety
    ///
    /// Accesses with the corresponding [`RingBuffer`] must be properly scheduled / synchronised
    #[inline]
    unsafe fn next_chunk (&'_ mut self) -> &'_ Chunk
    {
        let read_index = self.read_index;
        let chunk_ptr: *const UnsafeCell<T> =
            self.buffer[read_index ..][.. CHUNK_SIZE].as_ptr()
        ;
        // same as & BUFFER_SIZE_MASK, but becoming slower instead of UB if CHUNK_SIZE were not to be a power of two.
        self.read_index = (read_index + CHUNK_SIZE) % CHUNK_SIZE;
        &*(chunk_ptr as *const [T; CHUNK_SIZE])
    }
}

Finally, if (and only if!) CHUNK_SIZE = 256 will always be true in your program, you could "optimize" it into storing the index using a u8, with self.index = self.index.wrapping_add(CHUNK_SIZE) to update its value (since the 8-bit 255 = 0xff = 0b1111_1111 mask is then "automagically applied").

Each reader will have its own hard-coded size. All sizes will be powers of two and CHUNK_SIZE will be limited to BUFFER_SIZE >> 1 (and constrained by some assertions). (The scheduler will ensure that readers and writer will always access different hemispheres.) I'm going to do more bit-masking with the indices and that will break in other places if I don't use power of two sizes.

Thanks for the byte wrap hint, but 256 was just an example. I'll stick with masking. I've seen in the disassembly that this completely avoids range-checks, which is great.

And thanks a lot for this trick!

self.buffer[write_index ..][.. CHUNK_SIZE].as_ptr()

No need to yell at me. A simple "add unsafe to methods A/B because the contract cannot be enforced by the compiler" would have been enough. I'll update the code.

Not yelling at you, I apologise if it looked like it. I always try to make my posts "skimming-friendly" with some bold weight every now and then, and in this case, I wanted to have it written crystal clear for everyone to see (that is, I wasn't targetting you specifically) about the meaning of unsafe in a function's declaration :wink:

Hmm, I suggested that to get an added size check regarding the overflow on non divisibility thing :sweat_smile:, so you might want to go "back" to using:

self.buffer.get_unchecked(write_index) as *const UnsafeCell<T> as *mut T as *mut [T; CHUNK_SIZE]
1 Like

Sorry, but I'm really getting annoyed by the fact that nearly every post in this thread is about the obvious data race which I'm totally aware of. I know how dangerous they are and I know how to avoid them. I know about the required synchronization primitives and all that. I simply wanted to know if my usage of unsafe has consequences I'm not aware of.

Back on topic: the unsafe declaration on this method only tells the caller this method is unsafe? Or does it allow additional unsafe behaviour within the method (like disabling boundary checks)?

I know this was your intention :wink: but I have other uses in many places.

Both. But when you only want the latter (i.e. a safe function that wants to perform unsafe operations under the hood), you do what you did before:

fn name (args: ()) -> RetTy // no unsafe => you "promise" callers no UB can ever happen
{ unsafe {
    // you can do unsafe stuff here, such as `.get_unchecked()`    
}}
unsafe fn name (args: ()) -> RetTy // unsafe => warning, caller responsibility
{
    // you can do unsafe stuff here too
}
1 Like

I wanted to minimize the scope of unsafe which seems to be impossible as soon as I tell the outside world that my function is special.

I updated the code. I had to add UnsafeSync in order to use it in threads at all. (see Unsafely share memory between threads (single sender, multiple receivers) - #24 by leudz)

The missing array initializer is pretty annoying - Arc<UnsafeSync<UnsafeCell<[T; N]>>> would have been easy, but Arc<UnsafeSync<[UnsafeCell<T>; N]>> is meh :nauseated_face:

There is something I don't understand, you said that a chunk can only be accessed by a reader or a writer at one time, so why not going

Arc<[UnsafeCell<Chunk<T>>; BUFFER_SIZE / CHUNK_SIZE]>

Maybe I'm saying something idiotic, to me it feels simpler, isn't it?

1 Like

Or to support multiple potentially-concurrent readers XOR a writer:

RwLock<[UnsafeCell<Chunk<T>>; BUFFER_SIZE / CHUNK_SIZE]>

IMO that is not the primary reason why so many of us commented about the data race. It is that rustc is permitted to optimize your program into incorrect behavior wherever you have UB, and is increasingly likely to do so as its MIR optimizations, and those of its LLVM backend, improve with sequential releases. Use of UnsafeCell inhibits those inter-cell optimizations, which is what most of us were concerned about.

1 Like

Create a newtype, as you did, but where the [UnsafeCell<T>; BUFFER_SIZE] (or [UnsafeCell<Chunk>; BUFFER_SIZE / CHUNK_SIZE], as suggested by @leudz) is "hardcoded" into the newtype:

#[repr(transparent)]
struct SharedBuffer (
    [UnsafeCell<Chunk>; BUFFER_SIZE / CHUNK_SIZE]
);
/// No `safe` functions permit a data race, so this is fine
unsafe impl Sync for UnsafeBuffer {}

struct RingBuffer {
    buffer: Arc<UnsafeBuffer>,
    pub write_index: usize,
}

struct ChunkReader {
    buffer: Arc<UnsafeBuffer>,
    pub read_index: usize,
}

// whenever you used .buffer you need to write .buffer.0

Thanks for the suggestions, but there will be different readers each having a different chunk size. Even the writer may have a chunk size that differs from all readers (but all sizes will be a constant power of 2)

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.