Atomic option-like take

Inspired by this thread I was wondering if there are any concurrent primitives that supply an atomic take operation? As in, an Option on which you can call take concurrently and only have the first call get the value.

I looked around in both crossbeam and the standard library but couldn't find any such structure.

It doesn't seem too difficult to implement:

struct AtomicOption<T> {
    taken: AtomicBool,
    value: ManuallyDrop<T>,
}

impl<T> AtomicOption<T> {
    pub fn new(value: T) -> Self {
        AtomicOption {
            taken: AtomicBool::new(false),
            value: ManuallyDrop::new(value),
        }
    }
    pub fn take(&self) -> Option<T> {
        if self.taken.swap(true, Ordering::Relaxed) == false {
            unsafe { Some(ptr::read(&*self.value)) }
        } else {
            None
        }
    }
}

playground

3 Likes

I think this should be

self.taken.swap(true, Ordering::AcqRel)
1 Like

Well, you could use Arc::new(Mutex::new(Some(94))); (i.e. <Arc<Mutex<...>>):
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=f8ea5a43ce0c2d6b79ffe616b97bf10b

I mean, only the Mutex is needed to be equivalent to the code posted. My container would also need an Arc to be shared. The thing is: A mutex would involve locking, while the proposed option does not need any locks at all.

1 Like

Well Acquire should definitely by enough here, since this is equivalent to locking a mutex and never releasing the lock, and a mutex only needs Acquire.

Additionally, I think Relaxed is enough, because:

  1. If two threads are given the value false, then the swap was not atomic. Compare to fetch_add which guarantees each call results in a unique integer (modulo overflow).
  2. The write to value should be visible to the thread calling take, because if it wasn't, how could the code below be defined behavior?
struct Container<T> {
    value: T,
}

impl<T> Container<T> {
    pub fn new(value: T) -> Self {
        Container {
            value,
        }
    }
    pub fn get(&self) -> &T {
        &self.value
    }
}
3 Likes

There's arc-swap.

1 Like

Alice, you should definitely make an atomic-option crate :slight_smile:

1 Like

AtomicCell<Option<T>> from crossbeam?

For types that are not just an integer, AtomicCell is just a mutex. See is_lock_free.

Nice idea. The AtomicOption should .take() itself in drop, if needed, so that the inner value drops. It's unfortunate to unconditionally hang a destructor on this type, but it's all we can do I think.

I have a destructor in the playground link.

2 Likes

I think you need to manually implement and constrain Sync for T: Send, because right now it would allow take(&self) with just T: Sync to effectively Send the value between threads.

Ah yes that's true, the automatic Sync bound wouldn't be correct. Good point

If that's the only API your container is ever gonna get, then yes, Relaxed is fine because there are no writes to memory that the AtomicBool accesses should synchronize with (and as soon as the API gets richer offering read-write features you'd end up reimplementing Mutex / RwLock).

On drop you do not need an atomic load, since by definition, you are uncontested :upside_down_face:

Summary:

type PhantomUnsync = ::core::marker::PhantomData<Cell<u8>>;

struct AtomicOption<T> {
    taken: AtomicBool,
    value: ManuallyDrop<T>,
    _unsync: PhantomUnsync,
}

impl<T> AtomicOption<T> {
    pub
    fn new (value: T) -> Self
    {
        AtomicOption {
            taken: AtomicBool::new(false),
            value: ManuallyDrop::new(value),
            _unsync: Default::default(),
        }
    }

    pub
    fn take (self: &'_ Self) -> Option<T>
    {
        unsafe {
            if self.taken.swap(true, Ordering::Relaxed) {
                None
            } else {
                Some(ptr::read(&*self.value))
            }
        }
    }

    /// Because why not
    pub
    fn take_mut (self: &'_ mut Self) -> Option<T>
    {
        unsafe {
            if mem::replace(self.taken.get_mut(), true) {
                None
            } else {
                Some(ptr::read(&*self.value))
            }
        }
}

/// Sync bound is not needed, since no API exposes shared access to the inner T
unsafe impl<T : Send> Sync for AtomicOption<T> {}

impl<T> Drop for AtomicOption<T> {
    fn drop (self: &'_ mut Self)
    {
        if *self.taken.get_mut() { return; }
        unsafe {
            ManuallyDrop::drop(&mut self.value)
        }
    }
}

I think that the questions raised in this thread confirm that this is not something as easy to do correctly as it may have seemed, so it's definitely crate-worthy and you should publish it.

With an honorable mention to ::arcswap, although I can see different use-cases for each.

2 Likes

Well exactly, and I suppose the limited api is also why it doesn't already exist.

1 Like

I guess I'll go ahead and do that then. As for naming, I suppose something like TakeOnce or AtomicTake might be better.

2 Likes

I personally like ::atomic_take for the crate name, it has all the featured functionality described in it, which is important for visibility.

1 Like

I published it here.

6 Likes

AtomicCell should be lock-free for Box and Arc too, no?

I just tested it and yes it seems they are lock free. I'm kinda surprised. I suppose the advantage of AtomicTake is then just that it doesn't require an allocation for the Box. (in fact the crate is no-std compatible)