State of the art for Software Transaction Memory in Rust?

I am aware that in an ideal situation for STM, we want the compiler to be able to prove that a function is pure (in the Haskell sense). This then allows the runtime to retry a function on a failed transactional w/o fear of side effects.

I acknowledge that Rust currently does not have this ability.

Having said that: what is the current state of the art for either (1) using STM in Rust or (2) implementing STM in Rust.

Pre-emptive: Let us please focus the discussion on how to use/do STM in Rust only. In particular, please do not derail conversation into debate about merits / limitations of STM itself.

Buried in my thesis somewhere is an in-memory 2-Phase Commit implementation. The API is a little unwieldy, but the underlying concept worked quite well. Roughly, each transaction holds an &mut T, which forces all access to T to pass through the transaction. It then only allows operations that can be reliably reversed and stores the undo information until the transaction is committed. This design is presumed-commit— If the transaction is forgotten or dropped without being explicitly reverted the changes will be applied (unless there was an internal error forcing a revert).

Here’s a rough sketch of the interfaces involved (from memory):

trait RevertableOp<T> {
    type Log: UndoLog<T>;
    type Err:Debug;
    fn apply(self, target: &mut T)->Result<Self::Log, Self::Err>;
}

trait UndoLog<T> {
    fn revert(self, target: &mut T);
}

enum Transaction<‘a, T, Log> {
    Open(&’a mut T, Log),
    Reverted(Box<dyn Debug>)
}

impl<‘a, T, Log> Transaction {
    fn apply<Op:RevertableOp<T>>(self, op: Op)->Transaction<‘a, T, (Op::Log, Log)> {
        //…
    }
}
1 Like

If you're ok with using clone() to make snapshots, you can do something like this:

/// Takes a snapshot of all named vars as the block is entered,
/// and reverts any changes if there is a non-local exit from
/// the block (break, continue, return, panic, etc...)
macro_rules! txn {
    ($($var:ident),* => $($text:tt)*) => {
        {
            $(let mut $var = Snapshot::begin($var);)*
            let output = {
                $(let $var = &mut *$var;)*
                $($text)*
            };
            $(Snapshot::commit($var);)*
            output
        }
    }
}

#[test]
fn test_txn() {
    fn pop2(v:&mut Vec<usize>)->Option<(usize, usize)> {
        txn!{ v =>
            let a = v.pop()?;
            let b = v.pop()?;
            Some((b,a))
        }
    }
    
    let mut v:Vec<usize> = vec![1,2,3];
    
    assert_eq!(pop2(&mut v), Some((2,3)));
    assert_eq!(v, vec![1]);

    assert_eq!(pop2(&mut v), None);
    assert_eq!(v, vec![1]);
}

// === Implementation details ================

pub struct Snapshot<'a,T:Clone> {
    place: &'a mut T,
    orig: Option<T>
}

impl<'a, T:Clone> Drop for Snapshot<'a,T> {
    fn drop(&mut self) {
        if let Some(orig) = self.orig.take() {
            *self.place = orig;
        }
    }
}

impl<'a, T:Clone> Snapshot<'a,T> {
    pub fn begin(place: &'a mut T)->Self {
        Snapshot {
            orig: Some(place.clone()),
            place
        }
    }
    
    pub fn commit(mut self:Self) {
        self.orig = None;
    }
    
    pub fn revert(self:Self) {}
}

impl<'a,T:Clone> std::ops::Deref for Snapshot<'a,T> {
    type Target = T;
    fn deref(&self)->&T { &*self.place }
}

impl<'a,T:Clone> std::ops::DerefMut for Snapshot<'a,T> {
    fn deref_mut(&mut self)->&mut T { &mut *self.place }
}

As a third option, you could look to a multiversion concurrency control approach. I've implemented the core algorithm, but you'd need to write some kind of wrapper to make it a bit easier to use:

(untested, but compiles)

pub mod mvcc_core {
    use std::collections::BTreeMap;
    use std::collections::BTreeSet;
    use std::ops::DerefMut;
    pub struct Mvcc<K, V> {
        next_id: usize,
        txns: BTreeSet<usize>,
        data: BTreeMap<(K, usize), Option<V>>,
        updates: BTreeSet<(usize, Option<K>)>,
    }

    pub struct UpdateLock<K, V, T> {
        id: usize,
        updates: BTreeMap<K, Option<V>>,
        table: T,
    }

    impl<K: Ord + Clone, V: Clone> Mvcc<K, V> {
        pub fn vacuum(&mut self) {
            let oldest = match self.txns.iter().next() {
                Some(&id) => id,
                None => self.next_id - 1,
            };
            let keep = self.updates.split_off(&(oldest + 1, None));
            let discard = std::mem::replace(&mut self.updates, keep);
            for (id, opt_key) in discard {
                let key = opt_key.unwrap();
                if self.current(key.clone()) != id {
                    self.data.remove(&(key, id));
                }
            }
        }

        pub fn read(&self, key: K, version: usize) -> Option<V> {
            debug_assert!(self.txns.contains(&version));
            self.data
                .range((key.clone(), 0)..=(key, version))
                .last()
                .map(|(_, v)| v.clone())
                .flatten()
        }

        pub fn current(&self, key: K) -> usize {
            self.data
                .range((key.clone(), 0)..=(key, usize::MAX))
                .last()
                .map(|(&(_, v), _)| v)
                .unwrap_or(0)
        }

        pub fn prepare_update<T, Deps>(
            table: T,
            id: usize,
            updates: BTreeMap<K, Option<V>>,
            deps: Deps,
        ) -> Option<UpdateLock<K, V, T>>
        where
            T: DerefMut<Target = Self>,
            Deps: IntoIterator<Item = K>,
        {
            for key in deps {
                if table.current(key) >= id {
                    return None;
                }
            }
            for key in updates.keys() {
                if table.current(key.clone()) >= id {
                    return None;
                }
            }
            Some(UpdateLock { id, updates, table })
        }

        pub fn begin(&mut self) -> usize {
            let id = self.next_id;
            self.txns.insert(id);
            self.next_id += 1;
            id
        }

        pub fn end(&mut self, id: usize) {
            self.txns.remove(&id);
            self.vacuum();
        }

        pub fn new() -> Self {
            Mvcc {
                next_id: 1,
                txns: Default::default(),
                data: Default::default(),
                updates: Default::default(),
            }
        }
    }

    impl<K: Ord + Clone, V: Clone, T: DerefMut<Target = Mvcc<K, V>>> UpdateLock<K, V, T> {
        pub fn apply(mut self) {
            for (key, opt_val) in self.updates {
                self.table.data.insert((key.clone(), self.id), opt_val);
                self.table.updates.insert((self.id, Some(key)));
            }
        }
    }
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.