Review of unsafe async data structure for sending `&/&mut` references

Hello, I wrote that code and intend to use in production. Do you have some thoughts about it? Maybe I messed up with orderings or something like that?

use core::{
    ops::{Deref, DerefMut},
    ptr::{self, NonNull},
    sync::atomic::{AtomicPtr, AtomicU8, Ordering},
};

use atomic_waker::AtomicWaker;

use crate::wait_until::wait_until;

const RECEIVING: u8 = 0b0000_0001;
const LENT: u8 = 0b0000_0010;

#[derive(Debug)]
pub struct RefMutChannel<T> {
    state: AtomicU8,
    ptr: AtomicPtr<T>,
    borrower: AtomicWaker,
    lender: AtomicWaker,
}

pub struct RefMutChannelHost<'a, T> {
    state: &'a AtomicU8,
    ptr: &'a AtomicPtr<T>,
    borrower: &'a AtomicWaker,
    lender: &'a AtomicWaker,
}

pub struct RefMutChannelClient<'a, T> {
    state: &'a AtomicU8,
    ptr: &'a AtomicPtr<T>,
    borrower: &'a AtomicWaker,
    lender: &'a AtomicWaker,
}

#[must_use]
pub struct RefMutGuard<'a, T> {
    value: NonNull<T>,
    _borrow_guard: BorrowGuard<'a>,
}

struct BorrowGuard<'a> {
    state: &'a AtomicU8,
    notifier: &'a AtomicWaker,
}

unsafe impl<T: Send> Send for RefMutChannel<T> {}
unsafe impl<T: Send> Send for RefMutChannelClient<'_, T> {}
unsafe impl<T: Send> Send for RefMutChannelHost<'_, T> {}

impl<T> RefMutChannel<T> {
    pub fn split(&mut self) -> (RefMutChannelHost<'_, T>, RefMutChannelClient<'_, T>) {
        let state = &self.state;
        let ptr = &self.ptr;
        let borrower = &self.borrower;
        let lender = &self.lender;
        (
            RefMutChannelHost {
                state,
                ptr,
                borrower,
                lender,
            },
            RefMutChannelClient {
                state,
                ptr,
                borrower,
                lender,
            },
        )
    }
}

impl<'a, T: Send> RefMutChannelHost<'a, T> {
    /// # Safety
    /// You must ensure one of the following:
    /// - Future must be polled to completion and not leaked. Essentially, you
    ///   must block on that future.
    /// - Hard terminate the borrower thread before dropping the future.
    ///
    /// This lib has no platform dependencies, so user must use them or other
    /// means to guarantee soundness.
    pub async unsafe fn lend(&mut self, value: &mut T) {
        if self.state.load(Ordering::Acquire) & RECEIVING == 0 {
            return;
        }
        self.ptr.store(ptr::from_mut(value), Ordering::Relaxed);
        self.state.fetch_or(LENT, Ordering::Release);
        self.borrower.wake();
        wait_until(self.lender, || {
            (self.state.load(Ordering::Acquire) & LENT == 0).then_some(())
        })
        .await;
        debug_assert_eq!(self.ptr.load(Ordering::Relaxed), ptr::null_mut());
    }
}

impl<'a, T: Send> RefMutChannelClient<'a, T> {
    /// Borrow the value from the host.
    ///
    /// Note that leaking the future or it's return walue (assuming safety requirements)
    /// will block lender thread forever or until this thread terminates.
    pub async fn borrow<'b>(&'b mut self) -> RefMutGuard<'b, T> {
        self.state.fetch_or(RECEIVING, Ordering::Release);
        let borrow_guard = BorrowGuard {
            state: self.state,
            notifier: self.lender,
        };
        wait_until(self.borrower, || {
            (self.state.load(Ordering::Acquire) & LENT == LENT).then_some(())
        })
        .await;
        let value = self.ptr.swap(core::ptr::null_mut(), Ordering::Relaxed);
        RefMutGuard {
            value: NonNull::new(value).expect("Notified but value is null"),
            _borrow_guard: borrow_guard,
        }
    }
}

impl<'a> Drop for BorrowGuard<'a> {
    fn drop(&mut self) {
        self.state.store(0, Ordering::Release);
        self.notifier.wake();
    }
}

impl<'a, T> Deref for RefMutGuard<'a, T> {
    type Target = T;
    fn deref(&self) -> &Self::Target {
        // Safety: value will not be accessed while that RefMutGuard is alive
        //         Hope thread owning that value will not die
        unsafe { self.value.as_ref() }
    }
}

impl<'a, T> DerefMut for RefMutGuard<'a, T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        // Safety: value will not be accessed while that RefMutGuard is alive
        //         Hope thread owning that value will not die
        unsafe { self.value.as_mut() }
    }
}

impl<T> Default for RefMutChannel<T> {
    fn default() -> Self {
        Self {
            state: Default::default(),
            ptr: Default::default(),
            borrower: Default::default(),
            lender: Default::default(),
        }
    }
}

I also have just the same code but for shared references, not mutable. The only notable change from that code is Sync bound instead of Send

Thank you in advance!

I can't comment on the soundness of the synchronization method, but regarding your safety documentation:

    /// - Hard terminate the borrower thread before dropping the future.

It is unsound to terminate a thread in any way other other than the normal means of letting its code return or unwind, basically for the same reason this function is unsafe — the thread’s stack might own data borrowed by another thread. There are also liveness hazards in case the thread is holding a lock.[1]

Therefore, you should not suggest doing so to users.


  1. Of course, if you control 100% of the code that runs on that thread — not even calling any library code — and thereby know it does neither of those things, then you can kill threads. This sort of thing has, as far as I know, only been used successfully and soundly in programs like virtual machines which interpret or JIT-compile the application code the thread runs; in principle it should also work for “pure computation” sorts of tasks. ↩︎

3 Likes

This is sound to leak everything in Rust. Destructors cannot be a safety guarantee

Removing something you don't own isn't the same as leaking. Some soundness does depend on destructors running (or an abort happening before stack frame removal).

See

3 Likes

As a simple std-only example of that, std::thread::scope depends on proper unwinding for soundness. It doesn't depend on anything not being leaked, but it does depend on the stack frame it creates being unwound through, rather than discarded. Other functions in various libraries do the same.

1 Like

Thank you for great explanations and links!