Can't Find the Source of a Concurrency-Related Error

I've been looking at my code for over an hour, now and I can't find the source of the error. The biggest issue is, that the basic tests complete just fine, but the benchmark fails. At least, I managed to get the benchmark to panic, now, which happens in the loop {} when the count reaches 100. Aside from providing the source of the error, any tips for debugging this are welcome, as well.

Code (including tests/benchmarks) can be found on Github[1]:

#[cfg(test)]
mod tests;

use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Mutex;
use std::sync::MutexGuard;

const INVALID_INDEX: isize = -1;

enum SlotInner<T> {
    Filled(T),
    Empty(isize),
}

struct Slot<T> {
    inner: Mutex<SlotInner<T>>,
}

impl<T> Slot<T> {
    fn new(next_free_slot_index: isize) -> Self {
        Self {
            inner: Mutex::new(SlotInner::Empty(next_free_slot_index)),
        }
    }
}

pub struct Allocator<T> {
    storage: std::boxed::Box<[Slot<T>]>,
    free: std::sync::atomic::AtomicIsize,
}

impl<T> Allocator<T> {
    pub fn new(capacity: usize) -> Self {
        assert!(1 <= capacity, capacity <= (isize::MAX as usize));
        let mut storage = Vec::with_capacity(capacity);

        for next_free_slot_index in 1..capacity {
            storage.push(Slot::new(next_free_slot_index as isize))
        }

        storage.push(Slot::new(INVALID_INDEX));
        let storage = storage.into_boxed_slice();
        debug_assert!(capacity == storage.len());

        Self {
            storage,
            free: std::sync::atomic::AtomicIsize::new(0),
        }
    }

    #[track_caller]
    pub fn box_it(&self, value: T) -> Box<'_, T> {
        let Self { storage, free } = &self;
        let mut index;
        let mut slot_guard;
        let mut count = 0;

        loop {
            index = free.load(SeqCst);
            assert_ne!(INVALID_INDEX, index, "out of reserved memory");

            match storage[index as usize].inner.try_lock() {
                Ok(guard) => {
                    slot_guard = guard;
                    break;
                }
                Err(std::sync::TryLockError::WouldBlock) => {
                    count += 1;
                    assert!(count < 100);
                }
                Err(std::sync::TryLockError::Poisoned(e)) => {
                    panic!("{}", e)
                }
            }
        }

        let next_free = match slot_guard.deref() {
            SlotInner::Empty(n) => *n,
            SlotInner::Filled(_) => unreachable!(),
        };

        free.store(next_free, SeqCst);
        *slot_guard = SlotInner::Filled(value);

        Box {
            free,
            index,
            inner: slot_guard,
        }
    }
}

pub struct Box<'a, T> {
    inner: MutexGuard<'a, SlotInner<T>>,
    free: &'a std::sync::atomic::AtomicIsize,
    index: isize,
}

impl<T> Deref for Box<'_, T> {
    type Target = T;

    fn deref(&self) -> &T {
        match self.inner.deref() {
            SlotInner::Filled(value) => value,
            SlotInner::Empty(_) => unreachable!(),
        }
    }
}

impl<T> DerefMut for Box<'_, T> {
    fn deref_mut(&mut self) -> &mut T {
        match self.inner.deref_mut() {
            SlotInner::Filled(value) => value,
            SlotInner::Empty(_) => unreachable!(),
        }
    }
}

impl<T> Drop for Box<'_, T> {
    fn drop(&mut self) {
        *self.inner =
            SlotInner::Empty(self.free.swap(self.index, SeqCst));
    }
}

[1]: I haven't included a license, yet, so I'll just declare the code to be dual-licensed under MIT/Apache here, for now. Feel free to copy any necessary parts from the repository here.

pub fn box_it(&self, value: T) -> Box<'_, T> {
    let Self { storage, free } = &self;
    let mut index;
    let mut slot_guard;
    let mut count = 0;

    loop {
        index = free.load(SeqCst);
        assert_ne!(INVALID_INDEX, index, "out of reserved memory");

        match storage[index as usize].inner.try_lock() {
            Ok(guard) => {
                let next_free = match *guard {
                    SlotInner::Empty(n) => n,
                    SlotInner::Filled(_) => unreachable!(),
                };

                // other threads that are dropping Boxes could’ve updated `free`!
                if free.compare_exchange_weak(index, next_free, SeqCst, SeqCst).is_ok() {
                    slot_guard = guard;
                    break;
                }
            }
            Err(std::sync::TryLockError::WouldBlock) => {},
            Err(std::sync::TryLockError::Poisoned(e)) => {
                panic!("{}", e)
            }
        }
        count += 1;
        assert!(count < 100);
        // important, otherwise 100 is reached really quickly
        thread::yield_now();
    }

    *slot_guard = SlotInner::Filled(value);

    Box {
        free,
        index,
        inner: slot_guard,
    }
}

seems to works for me (see comments in the code for explanation of the changes)

2 Likes

One issue that springs to mind (may not be the issue) is Box’s Drop impl. It swaps free before unlocking the mutex. Once the swap occurs, other thread(s) can observe it. Now imagine the following sequence:

(1) T1 (doing Drop): swaps free and installs its index.
(2) T1: descheduled by kernel
(3) T2: loads free
(4) T2: spins trying to acquire free slot
(5) T3: (eventually put back on CPU) writes to self.inner and unlocks the mutex

T2 can run up the counter to 100 in step 4 if machine is loaded.

This is a theoretical concern and, as mentioned, may not be the source of your panic.

Have you tried to add simple printouts to see if you can get a trace of slot usage leading up to the panic? Printouts will change timing, of course, but maybe you can still trigger a panic.

You can also try panic=abort and look at the core dump to see state of the allocator - maybe that would shed light on the issue.

1 Like

That was the source of the error. In the previous version, where free was behind a mutex, I correctly dropped the guard, but when I converted it to the atomic version I somehow forgot about it.

Thanks a lot! You probably saved me from creating a graph for the whole box_it and drop function to figure out the problem, which would've easily taken 2+ hours.

EDIT/P.S.: Schockingly, this version is still slower for me than the original basic solution, even after switching to acquire/release. That's a really interesting result.

box                  :  842.79 ns
safe::basic/std{17}  : 6715.4  ns
safe::advanced/v3{17}: 8698.6  ns

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.