Atomic-synchronized UnsafeCell

I'm playing around with game engine programming at the moment, and found myself wanting a more advanced interior mutability primitive to deal with values that change on a regular schedule. The operating principle is to have a monotonically-increasing counter that represents update steps. Whenever a ClockCell sees a new value for this counter, it knows that there are no outstanding references to its contents, and can therefore modify them freely. Once this update is done, downstream code can get a shared reference to the cell's contents bounded by a lifetime that must end before the counter can be increased again.

In addition to a general code review, I'd also appreciate any thoughts along these lines:

  • Would some combination of pre-existing primitives do a comparable job, without the inherent risk of bespoke unsafe code?
  • I haven't used atomics for memory synchronization much, so I'm not perfectly confident that it won't seize up if I try to use this in a multithreaded context. I expect that most panics will leave the system deadlocked, but that's acceptable at the moment.
  • I haven't yet figured out how to use the tools in std::sync to reasonably share a single clock amongst multiple threads. Ideally, I'd have a RwLock<Clock<_>> with a Condvar that will wake up the readers whenever the clock ticks, and a Barrier to ensure that all the readers actually take their read locks before the control thread ticks the clock again, but I can't seem to come up with the right combination to put them all together. Especially with an unknown number of readers.

I've reproduced the main implementation below, and the playground has a (single-threaded) example with some of the tools I've built on top of this foundation.

pub mod clock {
    pub type Timecode = u32;
    type AtomicTimecode = std::sync::atomic::AtomicU32;
    const UPDATING: Timecode = Timecode::MAX;

    use crate::unique::Unique;
    use std::cell::UnsafeCell;
    use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};

    /// A monotonic counter that can be uniquely identified.
    #[derive(Debug)]
    pub struct Clock<Id> {
        identity: Id,
        timecode: Timecode,
    }

    impl<Id: Unique> Clock<Id> {
        pub fn new() -> Self {
            Clock {
                identity: Id::new_unique(),
                timecode: 1,
            }
        }

        pub fn tick(&mut self) {
            let new = self.timecode + 1;
            if new == UPDATING {
                panic!("Clock overflow");
            } else {
                self.timecode = new;
            }
        }

        pub fn now(&self) -> u32 {
            self.timecode
        }

        pub fn id(&self) -> Id {
            self.identity
        }

        pub fn new_cell<T>(&self, val: T) -> ClockCell<Id, T> {
            ClockCell {
                clock_id: self.id(),
                mtime: AtomicTimecode::new(self.now()),
                obj: UnsafeCell::new(val),
            }
        }

        pub fn new_cell_uninit<T>(&self, val: T) -> ClockCell<Id, T> {
            ClockCell {
                clock_id: self.id(),
                mtime: AtomicTimecode::new(0),
                obj: UnsafeCell::new(val),
            }
        }
    }

    // A synchronized cell that will update T at most once per
    // clock cycle.
    pub struct ClockCell<Id, T: ?Sized> {
        // Which clock controls this cell?
        clock_id: Id,

        // Contains either the timecode of the last update,
        // or UPDATING if an update is in progress.
        mtime: AtomicTimecode,
        obj: UnsafeCell<T>,
    }

    unsafe impl<Id: Unique, T: ?Sized + Send> Send for ClockCell<Id, T> {}
    unsafe impl<Id: Unique, T: ?Sized + Send> Sync for ClockCell<Id, T> {}

    impl<Id: Unique, T: ?Sized> ClockCell<Id, T> {
        pub fn update<'a>(&'a self, clock: &'a Clock<Id>, update: impl FnOnce(&mut T)) -> &'a T {
            // Ensure we've been given the correct token
            assert_eq!(self.clock_id, clock.id());

            let now = clock.now();
            let mtime = self.mtime.load(Acquire);
            if mtime == UPDATING {
                // Some other thread is running the update
                assert_eq!(self.spinwait(), now);
            } else if mtime < clock.now() {
                match self
                    .mtime
                    .compare_exchange(mtime, UPDATING, AcqRel, Acquire)
                {
                    Ok(_) => {
                        // We've locked T for updating.
                        unsafe {
                            update(&mut *self.obj.get());
                        }
                        self.mtime.store(now, Release);
                    }
                    Err(t) if t == UPDATING => {
                        // Some other thread is running the update
                        assert_eq!(self.spinwait(), now);
                    }
                    Err(t) if t == now => {
                        // Another thread completed the update between
                        // checks; do nothing
                    }
                    Err(_) => {
                        panic!("Unexpected value in atomic!");
                    }
                }
            }

            // No more changes to obj can occur until
            // clock.now() advances, which it can't do
            // until after the end of 'a
            unsafe { &*self.obj.get() }
        }

        /// Wait for an update to finish
        fn spinwait(&self) -> Timecode {
            loop {
                std::thread::yield_now();
                let tc = self.mtime.load(Acquire);
                if tc != UPDATING {
                    return tc;
                }
            }
        }

        pub fn last_modified(&self) -> Timecode {
            let mtime = self.mtime.load(Relaxed);
            if mtime != UPDATING {
                mtime
            } else {
                self.spinwait()
            }
        }
    }
}

// ------------------------------
// trait Unique
// ------------------------------

pub mod unique {
    use std::fmt::Debug;
    pub unsafe trait Unique:
        Copy + Clone + Eq + PartialEq + Debug + Send + Sync + 'static
    {
        /// Safety: No two calls may produce equal values
        fn new_unique() -> Self;
    }

    #[macro_export]
    macro_rules! def_unique_zst {
        ($viz:vis $name:ident) => {
            #[derive(Copy,Clone,Eq,PartialEq,Debug)]
            $viz struct $name(());

            unsafe impl $crate::unique::Unique for $name {
                fn new_unique()->Self {
                    use std::sync::atomic::{AtomicBool,Ordering::Relaxed};
                    static INIT: AtomicBool = AtomicBool::new(false);
                    assert!(!INIT.swap(true, Relaxed));
                    $name(())
                }
            }
        }
    }

    #[derive(Copy, Clone, Eq, PartialEq, Debug)]
    pub struct GenSym(usize);

    unsafe impl Unique for GenSym {
        fn new_unique() -> Self {
            use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
            static NEXT: AtomicUsize = AtomicUsize::new(0);
            GenSym(NEXT.fetch_add(1, Relaxed))
        }
    }
}
3 Likes

Interesting primitive. So far I can’t think of a way to do this with existing primitives (that I know of) either.

I didn’t bother checking the usage of atomic::Ordering and whether it’s sound, compared to the alternative of just using SeqCst everywhere. The rest of the implementation seems reasonable to me, AFAICT. I remember reading an article with some criticism on spin locks semi-recently; I don’t remember the main points, but I’d want to question whether spin locks are the most reasonable approach here. Unless that’s just a placeholder implementation anyways.

Looking at the Unique trait and the example implementation I think I noticed a small flaw

/// Safety: No two calls may produce equal values
GenSym(NEXT.fetch_add(1, Relaxed))

fetch_add wraps on overflow.


2 Likes

I haven't put much thought into it; a spinlock was just the first thing I thought of. I'll have to do some research to figure out some alternatives.

Good catch. I'll need to panic on overflow instead; it's unlikely to cause a problem with most programs, as it'll take quite a while to overflow a u64. (Though it's u32 on some platforms...)

I found a partial solution. It does involve the need for a list/queue of “things that have been updated and need to be reset into no-longer-up-to-date state” that needs to be traversed on each call to tick. But perhaps it’s still useful as a sanity-check (for the API being sound) or if you want a not-quite-as-efficient-but-unsafe-free version for whatever other reason. Here’s the code (not tested beyond the test case you provided)

#![allow(clippy::all)]

fn main() {
    use clock::Clock;
    use signal::Signal;

    let mut clock = Clock::new();

    let turn = signal::Iter::new(&clock, 1..).map(&clock, |&x| x.unwrap());

    let fizz = (&turn).map(&clock, |&x| match x % 3 {
        0 => "Fizz",
        _ => "",
    });

    let buzz = (&turn).map(&clock, |&x| match x % 5 {
        0 => "Buzz",
        _ => "",
    });

    let fizzbuzz = signal::State::new(&clock, String::new(), |clk, s| {
        use std::fmt::Write;
        s.truncate(0);
        write!(s, "{}{}", fizz.val(clk), buzz.val(clk)).unwrap();
        if s.len() == 0 {
            write!(s, "{}", turn.val(clk)).unwrap();
        }
    });

    for _ in 1..20 {
        clock.tick();
        dbg!(fizzbuzz.val(&clock));
    }
}

// ------------------------
// Basic signals
// ------------------------
pub mod signal {
    use crate::clock::{Clock, ClockCell, Timecode};

    pub trait Signal {
        type Output: ?Sized;
        fn val<'a>(&'a self, clock: &'a Clock) -> &'a Self::Output;
        fn mtime(&self, clock: &Clock) -> Timecode;

        fn coerce<F>(self, f: F) -> Coerce<Self, F>
        where
            Self: Sized,
            Coerce<Self, F>: Signal,
        {
            Coerce {
                signal: self,
                func: f,
            }
        }

        fn map<F, T>(self, clock: &Clock, f: F) -> ClockCell<Map<Self, F, T>>
        where
            Self: Sized,
            F: FnMut(&Self::Output) -> T,
        {
            clock.new_cell(Map {
                dep: self,
                func: f,
                mtime: clock.now(),
                current: None,
            })
        }
    }

    impl<T: ?Sized> Signal for &'_ T
    where
        T: Signal,
    {
        type Output = T::Output;
        fn val<'a>(&'a self, clock: &'a Clock) -> &'a Self::Output {
            T::val(self, clock)
        }
        fn mtime(&self, clock: &Clock) -> Timecode {
            T::mtime(self, clock)
        }
    }

    impl<T: ?Sized> Signal for Box<T>
    where
        T: Signal,
    {
        type Output = T::Output;
        fn val<'a>(&'a self, clock: &'a Clock) -> &'a Self::Output {
            T::val(self, clock)
        }
        fn mtime(&self, clock: &Clock) -> Timecode {
            T::mtime(self, clock)
        }
    }

    impl<T: ?Sized> Signal for std::rc::Rc<T>
    where
        T: Signal,
    {
        type Output = T::Output;
        fn val<'a>(&'a self, clock: &'a Clock) -> &'a Self::Output {
            T::val(self, clock)
        }
        fn mtime(&self, clock: &Clock) -> Timecode {
            T::mtime(self, clock)
        }
    }

    impl<T: ?Sized> Signal for std::sync::Arc<T>
    where
        T: Signal,
    {
        type Output = T::Output;
        fn val<'a>(&'a self, clock: &'a Clock) -> &'a Self::Output {
            T::val(self, clock)
        }
        fn mtime(&self, clock: &Clock) -> Timecode {
            T::mtime(self, clock)
        }
    }

    pub struct Coerce<S, F> {
        signal: S,
        func: F,
    }

    impl<U: ?Sized, S, F> Signal for Coerce<S, F>
    where
        S: Signal,
        F: Fn(&S::Output) -> &U,
    {
        type Output = U;
        fn val<'a>(&'a self, clock: &'a Clock) -> &'a U {
            (self.func)(self.signal.val(clock))
        }

        fn mtime(&self, clock: &Clock) -> u32 {
            self.signal.mtime(clock)
        }
    }

    pub struct Map<S, F, T> {
        dep: S,
        func: F,
        mtime: Timecode,
        current: Option<T>,
    }

    impl<S, F, T> SignalImpl for Map<S, F, T>
    where
        S: Signal,
        F: FnMut(&S::Output) -> T,
    {
        type Output = T;
        fn update(&mut self, clock: &Clock) {
            let tc = self.dep.mtime(clock);
            if tc != self.mtime {
                self.current = Some((self.func)(self.dep.val(clock)));
                self.mtime = tc;
            }
        }

        fn value(&self) -> &T {
            self.current.as_ref().unwrap()
        }

        fn mtime(&self) -> Timecode {
            self.mtime
        }
    }

    pub trait SignalImpl {
        type Output;
        fn update(&mut self, clock: &Clock);
        fn value(&self) -> &Self::Output;
        fn mtime(&self) -> Timecode;
    }

    pub struct Monitor<'a, T: ?Sized> {
        target: &'a mut T,
        dirty: bool,
    }

    impl<'a, T: ?Sized> std::ops::Deref for Monitor<'a, T> {
        type Target = T;
        fn deref(&self) -> &T {
            &*self.target
        }
    }

    impl<'a, T: ?Sized> std::ops::DerefMut for Monitor<'a, T> {
        fn deref_mut(&mut self) -> &mut T {
            self.dirty = true;
            &mut *self.target
        }
    }

    impl<'a, T: ?Sized> Monitor<'a, T> {
        pub fn new(target: &'a mut T) -> Self {
            Monitor {
                target,
                dirty: false,
            }
        }
    }

    pub struct State<T, F> {
        state: T,
        update: F,
        mtime: Timecode,
    }

    impl<T, F> SignalImpl for State<T, F>
    where
        F: FnMut(&Clock, &mut Monitor<'_, T>),
    {
        type Output = T;
        fn update(&mut self, clock: &Clock) {
            let mut monitor = Monitor::new(&mut self.state);
            (self.update)(clock, &mut monitor);
            if monitor.dirty {
                self.mtime = clock.now()
            }
        }
        fn value(&self) -> &T {
            &self.state
        }
        fn mtime(&self) -> Timecode {
            self.mtime
        }
    }

    impl<T, F> State<T, F> {
        pub fn new(clock: &Clock, init: T, update: F) -> ClockCell<Self>
        where
            F: FnMut(&Clock, &mut Monitor<'_, T>),
        {
            clock.new_cell(State {
                state: init,
                update,
                mtime: clock.now(),
            })
        }
    }

    impl<T> Signal for ClockCell<T>
    where
        T: SignalImpl,
    {
        type Output = T::Output;
        fn val<'a>(&'a self, clock: &'a Clock) -> &'a T::Output {
            self.update(clock, |this| this.update(clock)).value()
        }

        fn mtime(&self, clock: &Clock) -> Timecode {
            self.update(clock, |this| this.update(clock)).mtime()
        }
    }

    pub struct Volatile<F, T> {
        updater: F,
        mtime: Timecode,
        value: T,
    }

    impl<F, T> SignalImpl for Volatile<F, T>
    where
        F: FnMut() -> T,
    {
        type Output = T;

        fn update(&mut self, clock: &Clock) {
            self.value = (self.updater)();
            self.mtime = clock.now();
        }

        fn value(&self) -> &T {
            &self.value
        }

        fn mtime(&self) -> Timecode {
            self.mtime
        }
    }

    impl<F, T> Volatile<F, T>
    where
        F: FnMut() -> T,
    {
        pub fn new(clock: &Clock, mut updater: F) -> ClockCell<Self> {
            clock.new_cell(Volatile {
                mtime: clock.now(),
                value: updater(),
                updater,
            })
        }
    }

    pub struct Iter<I, T> {
        mtime: Timecode,
        iter: Option<I>,
        val: Option<T>,
    }

    impl<I, T> SignalImpl for Iter<I, T>
    where
        I: Iterator<Item = T>,
    {
        type Output = Option<T>;
        fn update(&mut self, clock: &Clock) {
            if let Some(it) = &mut self.iter {
                self.val = it.nth((clock.now() - self.mtime) as usize - 1);
                self.mtime = clock.now();
                if self.val.is_none() {
                    self.iter = None;
                }
            }
        }

        fn value(&self) -> &Option<T> {
            &self.val
        }
        fn mtime(&self) -> Timecode {
            self.mtime
        }
    }

    impl<I, T> Iter<I, T>
    where
        I: Iterator<Item = T>,
    {
        pub fn new<II: IntoIterator<IntoIter = I>>(clock: &Clock, iter: II) -> ClockCell<Self> {
            let iter = iter.into_iter();
            clock.new_cell(Iter {
                mtime: clock.now(),
                iter: Some(iter),
                val: None,
            })
        }
    }
}

// ---------------------------
// Clock
// ---------------------------

pub mod clock {
    pub type Timecode = u32;
    const UPDATING: Timecode = Timecode::MAX;

    use crossbeam::channel::{self, Receiver, Sender};
    use once_cell::sync::OnceCell;
    use parking_lot::Mutex;
    use qcell::{QCell, QCellOwner};

    use std::sync::Arc;

    /// A monotonic counter that can be uniquely identified.
    // #[derive(Debug)]
    pub struct Clock {
        identity: QCellOwner,
        timecode: Timecode,
        reset_list_read: Receiver<Arc<dyn Reset>>,
        reset_list_write: Sender<Arc<dyn Reset>>,
    }

    pub trait Reset {
        fn reset(&self, identity: &mut QCellOwner);
    }

    impl Clock {
        pub fn new() -> Self {
            let (reset_list_write, reset_list_read) = channel::unbounded();
            Clock {
                identity: QCellOwner::new(),
                timecode: 1,
                reset_list_read,
                reset_list_write,
            }
        }

        pub fn tick(&mut self) {
            let new = self.timecode + 1;
            if new == UPDATING {
                panic!("Clock overflow");
            } else {
                self.timecode = new;
            }
            while let Ok(reset) = self.reset_list_read.try_recv() {
                reset.reset(&mut self.identity)
            }
        }

        pub fn now(&self) -> u32 {
            self.timecode
        }

        pub fn new_cell<T>(&self, val: T) -> ClockCell<T> {
            let owner = QCellOwner::new();
            let cell = ClockCell {
                obj: QCell::new(&owner, val),
                owner: Arc::new(ClockCellOwner {
                    dirty: Mutex::new(None),
                    clean: QCell::new(&self.identity, OnceCell::from(owner)),
                }),
            };
            self.reset_list_write.send(cell.owner.clone()).unwrap();
            cell
        }

        pub fn new_cell_uninit<T>(&self, val: T) -> ClockCell<T> {
            let owner = QCellOwner::new();
            ClockCell {
                obj: QCell::new(&owner, val),
                owner: Arc::new(ClockCellOwner {
                    dirty: Mutex::new(Some(owner)),
                    clean: QCell::new(&self.identity, OnceCell::new()),
                }),
            }
        }
    }

    // A synchronized cell that will update T at most once per
    // clock cycle.
    pub struct ClockCell<T: ?Sized> {
        owner: Arc<ClockCellOwner>,
        obj: QCell<T>,
    }

    struct ClockCellOwner {
        dirty: Mutex<Option<QCellOwner>>,
        clean: QCell<OnceCell<QCellOwner>>,
    }

    impl Reset for ClockCellOwner {
        fn reset(&self, identity: &mut QCellOwner) {
            *self.dirty.lock() = identity.rw(&self.clean).take();
        }
    }

    impl<T: ?Sized> ClockCell<T> {
        pub fn update<'a>(&'a self, clock: &'a Clock, update: impl FnOnce(&mut T)) -> &'a T {
            self.owner
                .clean
                .ro(&clock.identity)
                .get_or_init(|| {
                    let mut owner = self.owner.dirty.lock().take().unwrap();
                    update(owner.rw(&self.obj));
                    clock.reset_list_write.send(self.owner.clone()).unwrap();
                    owner
                })
                .ro(&self.obj)
        }
    }
}
[dependencies]
crossbeam = "0.8.1"
once_cell = "1.10.0"
parking_lot = "0.12.0"
qcell = "0.5.0"
1 Like

This use case sounds very similar to ghost cell.

1 Like

Seeing how this conceptually relates to the existing options is certainly useful, and it shows that my API might be more restrictive than necessary. Effectively, each cell holds a mutable-access token that it can downgrade to shared access whenever it wants, and Clock::tick() issues a new mutable token to each cell.

Right now, I force the mutable-to-shared downgrade to happen after a single access, which is not necessary. I could split that operation into two:

  • A fallible method that provides mutable access via a closure if the token hasn't been downgraded yet, and
  • A method that downgrades the token and provides a shared reference.

It's similar, but ClockCell uses some runtime checks to extend the mutable access beyond the lifetime of the corresponding &mut Token. You need the mutable token to reset the cells' mutability, and then each one decides on its own when to downgrade from exclusive to shared access.