Fallen into the rabbit hole of implementing an oneshot channel (sync)

So while reading the great book Rust Atomics and Locks :+1: I fallen into the rabbit hole :rabbit: of implementing an oneshot channel myself without Arc just for fun and learning purposes. :grin:

Source code: Rust Playground

(Unfortunately my oneshot channel uses atomic-wait crate which is not included in Playground) :frowning:

During engagement and refactoring I stumbled about some abstractions pictured below with some details omitted:

AtomicState
Instead of relying on some integer constants, I wrapped an enum using #[repr(u32)] and mem::transmute():

#[repr(u32)]
enum State { ... }

struct AtomicState(AtomicU32);

impl AtomicState {
    fn load(&self, ordering: Ordering) -> State {
        let state = self.0.load(ordering);
        unsafe { transmute(state) }
    }
    fn store(&self, state: State, ordering: Ordering) {
        self.0.store(state as u32, ordering);
    }
    fn swap(&self, new_state: State, ordering: Ordering) -> State { ... }

    fn atomic_wait(&self, state: State) {
        atomic_wait::wait(&self.0, state as u32);
    }
    fn wake_one(&self) {
        atomic_wait::wake_one(&self.0);
    }
}

UnsafeVar
Purpose of UnsafeVar<T> is to access somewhat hassle free an unsafe variable to write a value into and read a value from and also drop the value via a non mutable reference. Should not be unsound because all critical methods are unsafe. Only the constructor is safe:

struct UnsafeVar<T>(UnsafeCell<MaybeUninit<T>>);

impl<T> UnsafeVar<T> {
    const fn uninit() -> Self {
        UnsafeVar(UnsafeCell::new(MaybeUninit::uninit()))
    }
    unsafe fn write(&self, value: T) {
        (*self.0.get()).write(value);
    }
    unsafe fn read(&self) -> T {
        (*self.0.get()).assume_init_read()
    }
    unsafe fn drop(&self) {
        (*self.0.get()).assume_init_drop();
    }
}

SharedBox
Don't know a better name. Maybe AliasedBox or something would fit too. Implements Clone and Deref and only drops the inner type using a manual unsafe drop() method.

I think implementing Clone is not unsound because the underlying Box is never dropped by a safe method and Deref only provides non mutable references:

struct SharedBox<T>(NonNull<T>);

impl<T> SharedBox<T> {
    fn new(value: T) -> Self {
        SharedBox(unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(value))) })
    }
    unsafe fn drop(&self) {
        drop(Box::from_raw(self.0.as_ptr()));
    }
}
impl<T> Clone for SharedBox<T> {
    fn clone(&self) -> Self {
        SharedBox(self.0)
    }
}
impl<T> Deref for SharedBox<T> {
    type Target = T;
    fn deref(&self) -> &T {
        unsafe { self.0.as_ref() }
    }
}

Channel
The Channel itself gets shared via two SharedBox in the sender and receiver and only consists of an state and unsafe variable:

struct Channel<T> {
    state: AtomicState,
    value: UnsafeVar<T>,
}

Sender
Using these abstractions the implementation gets pretty boring:

struct Sender<T>(SharedBox<Channel<T>>);

impl<T> Sender<T> {
    fn send(self, value: T) -> Result<(), SendError<T>> {
        unsafe { self.0.value.write(value) };

        let result = match self.0.state.swap(State::Sent, Ordering::Release) {
            State::Empty => Ok(()),
            State::RecvWait => {
                self.0.state.wake_one();
                Ok(())
            }
            State::Closed => unsafe {
                let value = self.0.value.read();
                self.0.drop();
                Err(SendError(value))
            },
        };

        mem::forget(self);
        result
    }
    // ...
}

Receiver
The counterpart receiving end:

struct Receiver<T>(SharedBox<Channel<T>>);

impl<T> Receiver<T> {
    fn recv(self) -> Result<T, RecvError> {
        let result = loop {
            match self.0.state.swap(State::RecvWait, Ordering::Acquire) {
                State::Empty | State::RecvWait => {
                    self.0.state.atomic_wait(State::RecvWait);
                }
                State::Sent => break Ok(unsafe { self.0.value.read() }),
                State::Closed => break Err(RecvError),
            }
        };

        unsafe { self.0.drop() };
        mem::forget(self);
        result
    }
   // ...
}

Of course there is more code, I omitted quite a bit here. Full source code: Rust Playground
Admittedly Playground lacks the atomic-wait crate :frowning:

I would be very happy if someone could take a closer look at the source code and I would also welcome any kind of criticism.

Many thanks in advance!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.