So while reading the great book Rust Atomics and Locks I fallen into the rabbit hole
of implementing an oneshot channel myself without
Arc
just for fun and learning purposes.
Source code: Rust Playground
(Unfortunately my oneshot channel uses atomic-wait crate which is not included in Playground)
During engagement and refactoring I stumbled about some abstractions pictured below with some details omitted:
AtomicState
Instead of relying on some integer constants, I wrapped an enum using #[repr(u32)]
and mem::transmute()
:
#[repr(u32)]
enum State { ... }
struct AtomicState(AtomicU32);
impl AtomicState {
fn load(&self, ordering: Ordering) -> State {
let state = self.0.load(ordering);
unsafe { transmute(state) }
}
fn store(&self, state: State, ordering: Ordering) {
self.0.store(state as u32, ordering);
}
fn swap(&self, new_state: State, ordering: Ordering) -> State { ... }
fn atomic_wait(&self, state: State) {
atomic_wait::wait(&self.0, state as u32);
}
fn wake_one(&self) {
atomic_wait::wake_one(&self.0);
}
}
UnsafeVar
Purpose of UnsafeVar<T>
is to access somewhat hassle free an unsafe variable to write a value into and read a value from and also drop the value via a non mutable reference. Should not be unsound because all critical methods are unsafe. Only the constructor is safe:
struct UnsafeVar<T>(UnsafeCell<MaybeUninit<T>>);
impl<T> UnsafeVar<T> {
const fn uninit() -> Self {
UnsafeVar(UnsafeCell::new(MaybeUninit::uninit()))
}
unsafe fn write(&self, value: T) {
(*self.0.get()).write(value);
}
unsafe fn read(&self) -> T {
(*self.0.get()).assume_init_read()
}
unsafe fn drop(&self) {
(*self.0.get()).assume_init_drop();
}
}
SharedBox
Don't know a better name. Maybe AliasedBox
or something would fit too. Implements Clone
and Deref
and only drops the inner type using a manual unsafe drop()
method.
I think implementing Clone
is not unsound because the underlying Box is never dropped by a safe method and Deref
only provides non mutable references:
struct SharedBox<T>(NonNull<T>);
impl<T> SharedBox<T> {
fn new(value: T) -> Self {
SharedBox(unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(value))) })
}
unsafe fn drop(&self) {
drop(Box::from_raw(self.0.as_ptr()));
}
}
impl<T> Clone for SharedBox<T> {
fn clone(&self) -> Self {
SharedBox(self.0)
}
}
impl<T> Deref for SharedBox<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { self.0.as_ref() }
}
}
Channel
The Channel itself gets shared via two SharedBox
in the sender and receiver and only consists of an state and unsafe variable:
struct Channel<T> {
state: AtomicState,
value: UnsafeVar<T>,
}
Sender
Using these abstractions the implementation gets pretty boring:
struct Sender<T>(SharedBox<Channel<T>>);
impl<T> Sender<T> {
fn send(self, value: T) -> Result<(), SendError<T>> {
unsafe { self.0.value.write(value) };
let result = match self.0.state.swap(State::Sent, Ordering::Release) {
State::Empty => Ok(()),
State::RecvWait => {
self.0.state.wake_one();
Ok(())
}
State::Closed => unsafe {
let value = self.0.value.read();
self.0.drop();
Err(SendError(value))
},
};
mem::forget(self);
result
}
// ...
}
Receiver
The counterpart receiving end:
struct Receiver<T>(SharedBox<Channel<T>>);
impl<T> Receiver<T> {
fn recv(self) -> Result<T, RecvError> {
let result = loop {
match self.0.state.swap(State::RecvWait, Ordering::Acquire) {
State::Empty | State::RecvWait => {
self.0.state.atomic_wait(State::RecvWait);
}
State::Sent => break Ok(unsafe { self.0.value.read() }),
State::Closed => break Err(RecvError),
}
};
unsafe { self.0.drop() };
mem::forget(self);
result
}
// ...
}
Of course there is more code, I omitted quite a bit here. Full source code: Rust Playground
Admittedly Playground lacks the atomic-wait crate
I would be very happy if someone could take a closer look at the source code and I would also welcome any kind of criticism.
Many thanks in advance!