Simple atomic slice allocator

I am learning atomics now, as I am very interested in wait-free data structures.

I used miri before to find some bugs in my unsafe codes and I heard of loom which is good at atomics related problems we can run into. So I tried both on my allocator.

For practice, I made an allocator. All it is doing is allocating blocks of memory from which it hands out slices. Of course, it is more complicated because the allocator can be shared between threads and allocation is lock-free.

I think I used loom correctly. But because my code has so many atomic accesses and branches, loom just kept iterating. It didn't stop for 5 minutes so I stopped it myself.

Miri didn't find anything so at least I have that :slight_smile: (I know it doesn't mean too much but still)

My Allocator
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;

const BLOCK_SIZE: usize = 64;
const N_BLOCKS: usize = 64;

/// An allocator which can be shared between threads safely. It only allows allocating slices of AtomicU32s.
/// The length of the slice can be specified when calling [`allocate()`](BlockAllocator::allocate).
///
/// To clean up all the allocations just [`drop`] the BlockAllocator
pub struct BlockAllocator {
    /// Index of the current block in `self.blocks`
    block_idx: AtomicUsize,
    /// Pointers to the blocks which are all by default null pointers and when a new memory block is needed
    /// the first null pointer is atomically swapped to a vector filled with zeros.
    /// (first element of each block is used as the index)
    blocks: Vec<AtomicPtr<Vec<AtomicU32>>>,
}

impl BlockAllocator {
    /// Create a new BlockAllocator.
    pub fn new() -> Self {
        let mut blocks = vec![];
        for _ in 0..N_BLOCKS {
            blocks.push(AtomicPtr::new(std::ptr::null_mut()));
        }

        BlockAllocator {
            block_idx: AtomicUsize::new(0),
            blocks,
        }
    }

    /// Create an allocation for `n_values`.
    pub fn allocate(&self, n_values: usize) -> &[AtomicU32] {
        assert!(
            n_values <= BLOCK_SIZE,
            "Can not allocate a slice larger than the BLOCK_SIZE."
        );
        for _ in 0..N_BLOCKS {
            let block_idx = self.block_idx.load(Acquire);
            let vec_ptr = self.blocks[block_idx].load(Acquire) as *const Vec<AtomicU32>;

            if vec_ptr.is_null() {
                self.allocate_current_block();
                continue;
            }

            // Safety: vec_ptr is non-null because we continue each time it is null and
            // it is a valid allocation done by self.allocate_current_block()
            let vec = unsafe { &*vec_ptr };
            let start_idx = vec[0].fetch_add(n_values as u32, Release) as usize;

            if start_idx + n_values > BLOCK_SIZE {
                // the allocation would be longer than the length of the block so we move onto the next block
                let next_block_idx = block_idx + 1;

                if next_block_idx + 1 == N_BLOCKS {
                    panic!("Maximum number of blocks exceeded.");
                }

                let _ = self.block_idx.compare_exchange_weak(
                    block_idx,
                    next_block_idx,
                    Release,
                    Relaxed,
                );
            } else {
                // Safety: vec has enough elements because if it doesn't then the
                // other branch of the if is entered.
                return unsafe {
                    std::slice::from_raw_parts(vec.as_ptr().add(start_idx), n_values)
                };
            }
        }
        unreachable!();
    }

    fn allocate_current_block(&self) {
        let block_idx = self.block_idx.load(Acquire);

        if !self.blocks[block_idx].load(Acquire).is_null() {
            // the current block is already allocated so return quickly
            return;
        }

        let vec_box_ptr = Box::into_raw(Box::new(Self::zeroed_vec()));

        let cas = self.blocks[block_idx].compare_exchange(
            std::ptr::null_mut(),
            vec_box_ptr,
            Release,
            Relaxed,
        );

        if cas.is_err() {
            // cas failed meaning some other thread already allocated this block
            // so we drop the new allocation

            // Safety: we allocated `vec_box_ptr` above and leaked it with `Box::into_raw` and
            // no other pointers/references to the underlying vector exist.
            let new_vec = unsafe { Box::from_raw(vec_box_ptr) };
            drop(new_vec);
        }
    }

    fn zeroed_vec() -> Vec<AtomicU32> {
        // the first element in the block is the index at the end of the last item
        // it is 1 at first so we do not hand out a slice with the index included
        let mut vec = vec![AtomicU32::new(1)];
        for _ in 0..BLOCK_SIZE - 1 {
            vec.push(AtomicU32::new(0));
        }
        vec
    }
}

impl Drop for BlockAllocator {
    fn drop(&mut self) {
        for block in &mut self.blocks {
            let vec_ptr = block.load(Acquire) as *mut Vec<AtomicU32>;

            if vec_ptr.is_null() {
                continue;
            }

            // Safety: we checked that `vec_ptr` is allocated by checking if it is non null
            // and the only way it is non null is if it was allocated by `Self::allocate_current_block`
            let new_vec = unsafe { Box::from_raw(vec_ptr) };
            drop(new_vec);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;

    #[test]
    fn allocate_single_thread() {
        let alloc = BlockAllocator::new();
        for _ in 0..64 * 3 {
            alloc.allocate(10);
        }
    }

    #[test]
    fn allocate_multi_thread() {
        let alloc = Arc::new(BlockAllocator::new());

        let ths: Vec<_> = (0..3)
            .map(|_| {
                let alloc = alloc.clone();
                std::thread::spawn(move || {
                    for _ in 0..2 {
                        alloc.allocate(60);
                    }
                })
            })
            .collect();

        for th in ths {
            th.join().unwrap();
        }
    }
}

And also a playground link

I am most interested in finding out if my code is sound or not.

But if there are simplifications or more rusty ways or even maybe faster ways to do some things then I would be very happy to hear those too.

I will make the docs better for example to include ways allocate can panic...

I'm not sure if it changes anything (probably not), but I would have used an AtomicPtr here.

1 Like

It can be a signal for the livelock. Try replace loop {...} with something like for _ in 0..100 {...} panic!() and see what happens with loom.

That is a good idea! In fact, we know that the max number we should retry is N_BLOCKS.

The loom test is running really long again just like before...
With nocapture it prints these:

================== Iteration 860000 ==================


 ================== Iteration 880000 ==================


 ================== Iteration 900000 ==================


 ================== Iteration 920000 ==================

So maybe that means it is good? I am not sure.

Good idea it is much nicer that way.

It means the algorithm is not actually a lock-free algorithm. Despite its name, lock-free doesn't means the algorithm doesn't involve any external lock implementation. Take a look at this Wikipedia article.

The loom iteration is exploded means there's certain execution sequence exist that doesn't make any progress to any threads, which makes test case keep looping.

1 Like

Interesting. I have no idea what that sequence could be. Even with everything SeqCst it keeps doing that. And it isn't the compare_exchange_weak either. I'll use compare_exchange for now because loom might make the weak version spuriously fail every time maybe? :thinking:

Here is the version with a loom test:

Loom
use loom::sync::atomic::AtomicPtr;
use loom::sync::atomic::AtomicU32;
use loom::sync::atomic::AtomicUsize;
use loom::sync::atomic::Ordering::*;

const BLOCK_SIZE: usize = 64;
const N_BLOCKS: usize = 64;

/// An allocator which can be shared between threads safely. It only allows allocating slices of AtomicU32s.
/// The length of the slice can be specified when calling [`allocate()`](BlockAllocator::allocate).
///
/// To clean up all the allocations just [`drop`] the BlockAllocator
pub struct BlockAllocator {
    /// Index of the current block in `self.blocks`
    block_idx: AtomicUsize,
    /// Pointers to the blocks which are all by default null pointers and when a new memory block is needed
    /// the first null pointer is atomically swapped to a vector filled with zeros.
    /// (first element of each block is used as the index)
    blocks: Vec<AtomicPtr<Vec<AtomicU32>>>,
}

impl BlockAllocator {
    /// Create a new BlockAllocator.
    pub fn new() -> Self {
        let mut blocks = vec![];
        for _ in 0..N_BLOCKS {
            blocks.push(AtomicPtr::new(std::ptr::null_mut()));
        }

        BlockAllocator {
            block_idx: AtomicUsize::new(0),
            blocks,
        }
    }

    /// Create an allocation for `n_values`.
    pub fn allocate(&self, n_values: usize) -> &[AtomicU32] {
        assert!(
            n_values <= BLOCK_SIZE,
            "Can not allocate a slice larger than the BLOCK_SIZE."
        );
        for _ in 0..N_BLOCKS {
            let block_idx = self.block_idx.load(SeqCst);
            let vec_ptr = self.blocks[block_idx].load(SeqCst) as *const Vec<AtomicU32>;

            if vec_ptr.is_null() {
                self.allocate_current_block();
                continue;
            }

            // Safety: vec_ptr is non-null because we continue each time it is null and
            // it is a valid allocation done by self.allocate_current_block()
            let vec = unsafe { &*vec_ptr };
            let start_idx = vec[0].fetch_add(n_values as u32, SeqCst) as usize;

            if start_idx + n_values > BLOCK_SIZE {
                // the allocation would be longer than the length of the block so we move onto the next block
                let next_block_idx = block_idx + 1;

                if next_block_idx + 1 == N_BLOCKS {
                    panic!("Maximum number of blocks exceeded.");
                }

                let _ = self
                    .block_idx
                    .compare_exchange(block_idx, next_block_idx, SeqCst, SeqCst);
            } else {
                // Safety: vec has enough elements because if it doesn't then the
                // other branch of the if is entered.
                return unsafe {
                    std::slice::from_raw_parts(vec.as_ptr().add(start_idx), n_values)
                };
            }
        }
        unreachable!();
    }

    fn allocate_current_block(&self) {
        let block_idx = self.block_idx.load(SeqCst);

        if !self.blocks[block_idx].load(SeqCst).is_null() {
            // the current block is already allocated so return quickly
            return;
        }

        let vec_box_ptr = Box::into_raw(Box::new(Self::zeroed_vec()));

        let cas = self.blocks[block_idx].compare_exchange(
            std::ptr::null_mut(),
            vec_box_ptr,
            SeqCst,
            SeqCst,
        );

        if cas.is_err() {
            // cas failed meaning some other thread already allocated this block
            // so we drop the new allocation

            // Safety: we allocated `vec_box_ptr` above and leaked it with `Box::into_raw` and
            // no other pointers/references to the underlying vector exist.
            let new_vec = unsafe { Box::from_raw(vec_box_ptr) };
            drop(new_vec);
        }
    }

    fn zeroed_vec() -> Vec<AtomicU32> {
        // the first element in the block is the index at the end of the last item
        // it is 1 at first so we do not hand out a slice with the index included
        let mut vec = vec![AtomicU32::new(1)];
        for _ in 0..BLOCK_SIZE - 1 {
            vec.push(AtomicU32::new(0));
        }
        vec
    }
}

impl Drop for BlockAllocator {
    fn drop(&mut self) {
        for block in &mut self.blocks {
            let vec_ptr = block.load(SeqCst) as *mut Vec<AtomicU32>;

            if vec_ptr.is_null() {
                continue;
            }

            // Safety: we checked that `vec_ptr` is allocated by checking if it is non null
            // and the only way it is non null is if it was allocated by `Self::allocate_current_block`
            let new_vec = unsafe { Box::from_raw(vec_ptr) };
            drop(new_vec);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use loom::sync::Arc;

    #[test]
    fn allocate_loom() {
        loom::model(|| {
            let alloc = Arc::new(BlockAllocator::new());

            let ths: Vec<_> = (0..3)
                .map(|_| {
                    let alloc = alloc.clone();
                    loom::thread::spawn(move || {
                        let mut ptrs = vec![];
                        for _ in 0..10 {
                            let slice = alloc.allocate(10);
                            ptrs.push(slice.as_ptr() as usize);
                        }
                        ptrs
                    })
                })
                .collect();

            let mut all_ptrs: Vec<usize> = vec![];
            for th in ths {
                let ptrs = th.join().unwrap();
                all_ptrs.extend(ptrs.iter());
            }

            let orig = all_ptrs.clone();

            all_ptrs.sort_unstable();
            all_ptrs.dedup();

            // check that no duplicate start idxs were given out
            assert_eq!(orig.len(), all_ptrs.len());
            assert_eq!(orig.len(), 3 * 10);
        });
    }
}

Okay, I found out that if we have more than two threads or allocate more than a single block then loom runs really long (not sure if it even finishes or not).

Three threads allocating one each, runs long for example.

Using LOOM_MAX_PREEMPTIONS=3 when running the tests makes them finish fast.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.