Hello, I wrote that code and intend to use in production. Do you have some thoughts about it? Maybe I messed up with orderings or something like that?
use core::{
ops::{Deref, DerefMut},
ptr::{self, NonNull},
sync::atomic::{AtomicPtr, AtomicU8, Ordering},
};
use atomic_waker::AtomicWaker;
use crate::wait_until::wait_until;
const RECEIVING: u8 = 0b0000_0001;
const LENT: u8 = 0b0000_0010;
#[derive(Debug)]
pub struct RefMutChannel<T> {
state: AtomicU8,
ptr: AtomicPtr<T>,
borrower: AtomicWaker,
lender: AtomicWaker,
}
pub struct RefMutChannelHost<'a, T> {
state: &'a AtomicU8,
ptr: &'a AtomicPtr<T>,
borrower: &'a AtomicWaker,
lender: &'a AtomicWaker,
}
pub struct RefMutChannelClient<'a, T> {
state: &'a AtomicU8,
ptr: &'a AtomicPtr<T>,
borrower: &'a AtomicWaker,
lender: &'a AtomicWaker,
}
#[must_use]
pub struct RefMutGuard<'a, T> {
value: NonNull<T>,
_borrow_guard: BorrowGuard<'a>,
}
struct BorrowGuard<'a> {
state: &'a AtomicU8,
notifier: &'a AtomicWaker,
}
unsafe impl<T: Send> Send for RefMutChannel<T> {}
unsafe impl<T: Send> Send for RefMutChannelClient<'_, T> {}
unsafe impl<T: Send> Send for RefMutChannelHost<'_, T> {}
impl<T> RefMutChannel<T> {
pub fn split(&mut self) -> (RefMutChannelHost<'_, T>, RefMutChannelClient<'_, T>) {
let state = &self.state;
let ptr = &self.ptr;
let borrower = &self.borrower;
let lender = &self.lender;
(
RefMutChannelHost {
state,
ptr,
borrower,
lender,
},
RefMutChannelClient {
state,
ptr,
borrower,
lender,
},
)
}
}
impl<'a, T: Send> RefMutChannelHost<'a, T> {
/// # Safety
/// You must ensure one of the following:
/// - Future must be polled to completion and not leaked. Essentially, you
/// must block on that future.
/// - Hard terminate the borrower thread before dropping the future.
///
/// This lib has no platform dependencies, so user must use them or other
/// means to guarantee soundness.
pub async unsafe fn lend(&mut self, value: &mut T) {
if self.state.load(Ordering::Acquire) & RECEIVING == 0 {
return;
}
self.ptr.store(ptr::from_mut(value), Ordering::Relaxed);
self.state.fetch_or(LENT, Ordering::Release);
self.borrower.wake();
wait_until(self.lender, || {
(self.state.load(Ordering::Acquire) & LENT == 0).then_some(())
})
.await;
debug_assert_eq!(self.ptr.load(Ordering::Relaxed), ptr::null_mut());
}
}
impl<'a, T: Send> RefMutChannelClient<'a, T> {
/// Borrow the value from the host.
///
/// Note that leaking the future or it's return walue (assuming safety requirements)
/// will block lender thread forever or until this thread terminates.
pub async fn borrow<'b>(&'b mut self) -> RefMutGuard<'b, T> {
self.state.fetch_or(RECEIVING, Ordering::Release);
let borrow_guard = BorrowGuard {
state: self.state,
notifier: self.lender,
};
wait_until(self.borrower, || {
(self.state.load(Ordering::Acquire) & LENT == LENT).then_some(())
})
.await;
let value = self.ptr.swap(core::ptr::null_mut(), Ordering::Relaxed);
RefMutGuard {
value: NonNull::new(value).expect("Notified but value is null"),
_borrow_guard: borrow_guard,
}
}
}
impl<'a> Drop for BorrowGuard<'a> {
fn drop(&mut self) {
self.state.store(0, Ordering::Release);
self.notifier.wake();
}
}
impl<'a, T> Deref for RefMutGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
// Safety: value will not be accessed while that RefMutGuard is alive
// Hope thread owning that value will not die
unsafe { self.value.as_ref() }
}
}
impl<'a, T> DerefMut for RefMutGuard<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
// Safety: value will not be accessed while that RefMutGuard is alive
// Hope thread owning that value will not die
unsafe { self.value.as_mut() }
}
}
impl<T> Default for RefMutChannel<T> {
fn default() -> Self {
Self {
state: Default::default(),
ptr: Default::default(),
borrower: Default::default(),
lender: Default::default(),
}
}
}
I also have just the same code but for shared references, not mutable. The only notable change from that code is Sync
bound instead of Send
Thank you in advance!