Lock-Free, One-Way Channel Using Unsafe Rust

I'm playing with some unsafe Rust code. The following is an incomplete one-way channel for passing data from a writing thread to a reading thread, where I aim to minimize the execution cost by ensuring lock-free execution and minimal atomic synchronization. The general idea is to implement a ring buffer-backed channel.

use std::sync::Arc;
use core::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering::Relaxed;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::task::AtomicWaker;
use futures::{Stream, StreamExt};

// *****************************************************************************

pub struct Sender<T> {
    pub(crate) buf: Arc<[T; 5]>,
    pub(crate) waker: Arc<AtomicWaker>,
    pub(crate) rindex: Arc<AtomicUsize>,
    pub(crate) windex: Arc<AtomicUsize>,
    pub(crate) closed: Arc<bool>,
}

impl<T> Sender<T> {
    pub fn push(&mut self, data: T) {

        let windex = self.windex.load(Relaxed);
        let ptr = Arc::as_ptr(&self.buf) as *mut T;
        unsafe {
            ptr.add(windex).write(data);
        }

        self.windex.fetch_add(1, Relaxed);
        self.waker.wake();
    }

    pub fn close(&mut self) {
        let ptr = Arc::as_ptr(&self.closed) as *mut bool;
        unsafe {
            ptr.write(true);
        }
    }
}

// *****************************************************************************

pub struct Receiver<T: Copy> {
    pub(crate) buf: Arc<[T; 5]>,
    pub(crate) waker: Arc<AtomicWaker>,
    pub(crate) rindex: Arc<AtomicUsize>,
    pub(crate) windex: Arc<AtomicUsize>,
    pub(crate) closed: Arc<bool>,
}

impl<T: Copy> Receiver<T> {
    pub fn pop(&mut self) -> Option<T> {
        let rindex = self.rindex.load(Relaxed);
        let windex = self.windex.load(Relaxed);

        if windex > rindex {
            self.rindex.fetch_add(1, Relaxed);
            Some(self.buf.get(rindex).unwrap().to_owned())
        } else {
            None
        }
    }

    pub fn is_closed(&self) -> bool {
        let rindex = self.rindex.load(Relaxed);
        let windex = self.windex.load(Relaxed);

        *self.closed && rindex == windex
    }
}

impl<T: Copy> Stream for Receiver<T> {
    type Item = T;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        self.waker.register(cx.waker());

        if self.is_closed() {
            Poll::Ready(None)
        } else if let Some(item) = self.pop() {
            Poll::Ready(Some(item))
        } else {
            Poll::Pending
        }
    }
}

// *****************************************************************************

#[tokio::main]
async fn main() {

    let buf = Arc::new([
        [0u8,1], [2,3], [4,5], [6,7], [8,9],
    ]);
    let waker = Arc::new(AtomicWaker::new());
    let rindex = Arc::new(AtomicUsize::new(0));
    let windex = Arc::new(AtomicUsize::new(0));
    let closed = Arc::new(false);

    let mut sender = Sender{
        buf: buf.clone(),
        waker: waker.clone(),
        rindex: rindex.clone(),
        windex: windex.clone(),
        closed: closed.clone(),
    };
    let mut receiver = Receiver{
        buf: buf,
        waker: waker,
        rindex,
        windex,
        closed,
    };

    let s = tokio::spawn(async move {
        let feed = [
            [100,101],
            [102,103],
            [104,105],
            [106,107],
            [108,109],
        ];
        for msg in feed {
            tokio::time::sleep(std::time::Duration::from_millis(300)).await;
            sender.push(msg);
        }
        sender.close();
    });
    let r = tokio::spawn(async move {
        while let Some(data) = receiver.next().await {
            println!("{:?}", data);
        }
    });
    s.await.unwrap();
    r.await.unwrap();
}

The code works on my Mac, but I'm uncertain if it would function properly on other computers. I question whether the direct memory mutation via a pointer, as demonstrated in push, truly results in the immediate display of altered data to the other thread(s). I surmise that waker, rindex, and windex in this context need to be atomic types, as reading could become corrupted otherwise. When setting closed, the process progresses in one direction and is always read after the write in another thread is completed. The buf items are never simultaneously accessed by both threads — in terms of indexes, the writer leads while the reader follows.

since you are using Relaxed everywhere, it's not guaranteed to work. but it probably works on x86 anyway. the unsafe pointer write makes no difference as to synchronization, you need at minimal one pair of atomic release-acquire pair. e.g.:

// use `Release` on writer thread, `fetch_add` is an RMW operation, but load
// is not constrained and will use  Relaxed, the store however will use Release
pub fn push(&mut self, data: T) {
    //...
    self.windex.fetch_add(1, Release);
    //...
}

// use `Acquire` on reader thread accordingly
pub fn pop(&mut self) -> Option<T> {
    let rindex = self.rindex.load(Relaxed);
    let windex = self.windex.load(Acquire);
    //...
}

but if you are only calling Receiver::pop() through Stream::poll_next, you are synchronized through the AtomicWaker, in which case, you don't need to use atomic for windex or rindex in the first place.

1 Like

Arc::as_ptr() derives a pointer from an immutable reference. Writing through that is undefined behavior.

The "all fields are Arc" style coding is also a red flag. It looks like you want all of the Sender to be behind an Arc instead.

A couple of observations:

  • If you try to push more than 5 elements your Sender will write out of bounds.

  • Your design needs to initialize the Ts in the buf with some value before creating the channel. What if there's no value for them?

  • When you push you're leaking the old value of Ts in buf

2 Likes

You may want to look at the implementation of the ringbuf crate for ideas/comparison.

1 Like

It also seems racy (i.e., wrong) to separately load() and then fetch_add(). That renders the atomic useless, since some other thread can mess up the index between the two operations. The same is true for the operations that use multiple atomics (mainly pop() and is_closed()): for instance, there's nothing preventing the read and the write index to be updated out of sync by another thread.

Similarly, both the sender and the receiver end up taking &mut self, making the atomics useless. For tracking initialization, you can use Option, which in turn makes the Copy bound unnecessary as well. (Bounds on type parameters of a UDT definition are an anti-pattern anyway.)

For lock-free shared mutability, you can and must use UnsafeCell, otherwise writing behind a shared reference is instant UB, as I mentioned earlier.

I tried to reduce the number of individual atomics, atomic operations, and minimize the scope of unsafe, but I couldn't get it to pass Miri even with all loads and stores being SeqCst. It's still racy and causes UB. I don't believe what you want can be implemented in such a naïve way, because using multiple atomics does seem like it will always be racy. Anyway, here's the incorrect, UB-exhibiting code if someone wants to figure out how to fix it, if that's at all possible.

I sincerely think you should just use the proven-safe abstractions and wrap the thing in a Mutex. Here's such an implementation that I believe to be correct.

1 Like

Thank you all for your valuable contributions. Since my initial post was only a draft, some of the comments seemed to deviate from the core question, which resulted in some confusion. To address this, I've taken some time to refine the idea, during which I incorporated several of your suggestions.

Please review the updated version; I'm eager to receive your detailed feedback. Do note that this code aims to handle the Single Producer Single Consumer (SPSC) case only. Any missing features have been highlighted in the comments for clarity. Additionally, I've included some explanatory notes that I believe will be helpful.

I'm looking forward to hearing your thoughts.

use std::sync::Arc;
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering, AtomicBool};
use std::mem::MaybeUninit;
use std::task::{Context, Poll};
use std::pin::Pin;
use futures::{Stream, StreamExt};
use futures::task::AtomicWaker;

// *****************************************************************************

pub(crate) struct Queue<T> {
    head: AtomicUsize,
    tail: AtomicUsize,
    buffer: Box<[UnsafeCell<MaybeUninit<T>>]>,
    cap: usize,
}

unsafe impl<T: Sync> Sync for Queue<T> {}

// TODO: Add error handling.
/// TODO: Address that it works for SPSC only.
impl<T> Queue<T> {
    pub fn with_capacity(cap: usize) -> Self {
        // TODO: Handle cap exceeds the allowed size.

        let buffer = (0..cap)
            .map(|_| {
                UnsafeCell::new(MaybeUninit::uninit())
            })
            .collect();

        Self {
            head: AtomicUsize::new(0),
            tail: AtomicUsize::new(0),
            buffer,
            cap,
        }
    }

    /// TODO: Address that it works for SPSC only.
    pub fn push(&self, data: T) {
        let head = Index::from_usize(self.head.load(Ordering::Relaxed));
        let tail = Index::from_usize(self.tail.load(Ordering::Acquire));

        let mut next_head = head.clone();
        next_head.inc(self.cap);

        if head.value() == tail.value() && head.wrap() != tail.wrap() {
            // TODO: Throw Error instead.
            panic!("buffer is full");
        }

        unsafe {
            (*self.buffer[head.value()].get()).as_mut_ptr().write(data);
        }

        self.head.store(next_head.into_usize(), Ordering::Release);
    }

    /// TODO: Address that it works for SPSC only.
    pub fn pop(&self) -> Option<T> {
        let head = Index::from_usize(self.head.load(Ordering::Acquire));
        let tail = Index::from_usize(self.tail.load(Ordering::Relaxed));

        if head.value() == tail.value() && head.wrap() == tail.wrap() {
            return None;
        }

        // NOTE: When item is returned, this slot in the buffer already holds 
        // uninitialized value this it doesn't need to be deallocated in Drop.
        let item = unsafe {
            (*self.buffer[tail.value()].get()).as_ptr().read()
        };

        let mut next_tail = tail.clone();
        next_tail.inc(self.cap);

        self.tail.store(next_tail.into_usize(), Ordering::Release);
        Some(item)
    }
}

impl<T> Drop for Queue<T> {
    fn drop(&mut self) {
        // Ensure remaining elements are properly dropped.
        // NOTE: We could simply call pop() on the remaining items but using
        // drop_in_place is faster.
        let head = Index::from_usize(*self.head.get_mut());
        let mut tail = Index::from_usize(*self.tail.get_mut());

        while tail.value() != head.value() || tail.wrap() != head.wrap() {
            unsafe {
                let item = &mut *self.buffer[tail.value()].get();
                item.as_mut_ptr().drop_in_place();
            }
            tail.inc(self.cap);
        }
    }
}

// *****************************************************************************

const WRAP_FLAG: usize = 1 << (std::mem::size_of::<usize>() * 8 - 1); // NOTE: Mask for the Most Significant Bit (MSB)

/// The Index class in the provided code is a way to keep track of the position
/// within a buffer along with an associated "wrap status". It leverages the
/// Most Significant Bit (MSB) of a usize number to store this "wrap" status,
/// which toggles each time the index wraps around the buffer. Essentially, it
/// offers a mechanism to distinguish between the first and subsequent passes
/// through the buffer in a circular, or "wrapping", fashion.
#[derive(Debug, Clone)]
pub(crate) struct Index {
    value: usize,
    wrap: bool,
}
impl Index {
    pub fn from_usize(v: usize) -> Self {
        Self {
            value: v & !WRAP_FLAG,
            wrap: v & WRAP_FLAG != 0,
        }
    }

    pub fn inc(&mut self, cap: usize) {
        self.value += 1;
        if self.value >= cap {
            self.value = 0;
            self.wrap = !self.wrap;
        }
    }

    pub fn as_usize(&self) -> usize {
        if self.wrap {
            self.value + WRAP_FLAG
        } else {
            self.value
        }
    }

    pub fn into_usize(self) -> usize {
        self.as_usize()
    }

    pub fn value(&self) -> usize {
        self.value
    }

    pub fn wrap(&self) -> bool {
        self.wrap
    }
}

// *****************************************************************************

pub(crate) struct State<T> {
    pub(crate) queue: Queue<T>,
    pub(crate) waker: AtomicWaker,
    pub(crate) closed: AtomicBool,
}

impl<T> State<T> {
    pub fn close(&self) {
        self.closed.store(true, Ordering::Release);
    }

    pub fn is_closed(&self) -> bool {
        self.closed.load(Ordering::Acquire)
    }
}

// *****************************************************************************

/// NOTE: When sender is dropped, the pipe is marked as `closed`.
pub struct Sender<T> {
    pub(crate) state: Arc<State<T>>,
}

impl<T> Sender<T> {
    pub fn send(&mut self, data: T) {
        self.state.queue.push(data);
        self.state.waker.wake();
    }
}

impl<T> Drop for Sender<T> {
    fn drop(&mut self) {
        self.state.close();
        self.state.waker.wake();
    }
}

// *****************************************************************************

/// NOTE: When sender is dropped, the pipe is marked as `closed` but the
/// receiver will pop all the items that have been pushed by the sender and
/// will exit aftwerwards.
pub struct Receiver<T> {
    pub(crate) state: Arc<State<T>>,
}

impl<T> Stream for Receiver<T> {
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        self.state.waker.register(cx.waker());

        if let Some(item) = self.state.queue.pop() {
            Poll::Ready(Some(item))
        } else if self.state.is_closed() {
            Poll::Ready(None)
        } else {
            Poll::Pending
        }
        // TODO: Handle close event.
    }
}

// *****************************************************************************

pub fn pipe<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
    let state = Arc::new(State {
        queue: Queue::with_capacity(cap),
        waker: AtomicWaker::new(),
        closed: AtomicBool::new(false),
    });
    let sender = Sender{state: state.clone()};
    let receiver = Receiver{state};
    (sender, receiver)
}

// *****************************************************************************

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = pipe(3);

    let t = tokio::spawn(async move {
        for i in 0..15 {
            tx.send(i);
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        }
    });

    while let Some(i) = rx.next().await {
        println!("[pop] {:?}", i);
    }

    t.await.unwrap();
}

To highlight the fact that Queue is a SPSC queue only I would mark push and pop as unsafe with the precondition that each of them can only be called from a thread at a time.

I would also put the Queue and the AtomicWaker in the same Arc to avoid unnecessary allocations.

You could move the self.waker.register(cx.waker()); in Receiver inside the else branch, to avoid needlessly registering wakers in case the queue can be immediately popped from.

1 Like

That's an intriguing proposition. Even when the struct is marked pub(crate) , is it advisable to label pop and push methods as unsafe ? I'm keen to understand the general norms for documenting or coding such scenarios. Does this mean that if a method doesn't cater to all use cases, it should be marked unsafe ? From what I gather, you're suggesting that each method should also contain comments explaining its preconditions, am I correct?

No. A function must be marked unsafe exactly if calling it (with the "wrong" input) can cause undefined behavior.

By the way, your example code above does not compile. After attempting a fix, the resulting code doesn't run under Miri.

1 Like

Hum, sorry @H2CO3 about that. I have updated the code to incorporate some of the suggestions made by @SkiFire13. You can view the updated code here:

Playground

Oh, and regarding MIRI, it fails to run on Macs due to the tokio crate.

It's sound because it's not exposed to downstream crates, but its existance means you'll have to consider your crate as if it was a big unsafe block. More in general, when you have a pub(something) function that's actually unsafe, you have to consider the whole something scope as if it could do something unsafe, because it could call that function at any time without an unsafe block. Thus you either relax or lose the property that unsafe code can be proved sound in isolation, and instead have to consider all the code that could potentially call it.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.