I am learning atomics now, as I am very interested in wait-free data structures.
I used miri before to find some bugs in my unsafe codes and I heard of loom which is good at atomics related problems we can run into. So I tried both on my allocator.
For practice, I made an allocator. All it is doing is allocating blocks of memory from which it hands out slices. Of course, it is more complicated because the allocator can be shared between threads and allocation is lock-free.
I think I used loom correctly. But because my code has so many atomic accesses and branches, loom just kept iterating. It didn't stop for 5 minutes so I stopped it myself.
Miri didn't find anything so at least I have that (I know it doesn't mean too much but still)
My Allocator
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::*;
const BLOCK_SIZE: usize = 64;
const N_BLOCKS: usize = 64;
/// An allocator which can be shared between threads safely. It only allows allocating slices of AtomicU32s.
/// The length of the slice can be specified when calling [`allocate()`](BlockAllocator::allocate).
///
/// To clean up all the allocations just [`drop`] the BlockAllocator
pub struct BlockAllocator {
/// Index of the current block in `self.blocks`
block_idx: AtomicUsize,
/// Pointers to the blocks which are all by default null pointers and when a new memory block is needed
/// the first null pointer is atomically swapped to a vector filled with zeros.
/// (first element of each block is used as the index)
blocks: Vec<AtomicPtr<Vec<AtomicU32>>>,
}
impl BlockAllocator {
/// Create a new BlockAllocator.
pub fn new() -> Self {
let mut blocks = vec![];
for _ in 0..N_BLOCKS {
blocks.push(AtomicPtr::new(std::ptr::null_mut()));
}
BlockAllocator {
block_idx: AtomicUsize::new(0),
blocks,
}
}
/// Create an allocation for `n_values`.
pub fn allocate(&self, n_values: usize) -> &[AtomicU32] {
assert!(
n_values <= BLOCK_SIZE,
"Can not allocate a slice larger than the BLOCK_SIZE."
);
for _ in 0..N_BLOCKS {
let block_idx = self.block_idx.load(Acquire);
let vec_ptr = self.blocks[block_idx].load(Acquire) as *const Vec<AtomicU32>;
if vec_ptr.is_null() {
self.allocate_current_block();
continue;
}
// Safety: vec_ptr is non-null because we continue each time it is null and
// it is a valid allocation done by self.allocate_current_block()
let vec = unsafe { &*vec_ptr };
let start_idx = vec[0].fetch_add(n_values as u32, Release) as usize;
if start_idx + n_values > BLOCK_SIZE {
// the allocation would be longer than the length of the block so we move onto the next block
let next_block_idx = block_idx + 1;
if next_block_idx + 1 == N_BLOCKS {
panic!("Maximum number of blocks exceeded.");
}
let _ = self.block_idx.compare_exchange_weak(
block_idx,
next_block_idx,
Release,
Relaxed,
);
} else {
// Safety: vec has enough elements because if it doesn't then the
// other branch of the if is entered.
return unsafe {
std::slice::from_raw_parts(vec.as_ptr().add(start_idx), n_values)
};
}
}
unreachable!();
}
fn allocate_current_block(&self) {
let block_idx = self.block_idx.load(Acquire);
if !self.blocks[block_idx].load(Acquire).is_null() {
// the current block is already allocated so return quickly
return;
}
let vec_box_ptr = Box::into_raw(Box::new(Self::zeroed_vec()));
let cas = self.blocks[block_idx].compare_exchange(
std::ptr::null_mut(),
vec_box_ptr,
Release,
Relaxed,
);
if cas.is_err() {
// cas failed meaning some other thread already allocated this block
// so we drop the new allocation
// Safety: we allocated `vec_box_ptr` above and leaked it with `Box::into_raw` and
// no other pointers/references to the underlying vector exist.
let new_vec = unsafe { Box::from_raw(vec_box_ptr) };
drop(new_vec);
}
}
fn zeroed_vec() -> Vec<AtomicU32> {
// the first element in the block is the index at the end of the last item
// it is 1 at first so we do not hand out a slice with the index included
let mut vec = vec![AtomicU32::new(1)];
for _ in 0..BLOCK_SIZE - 1 {
vec.push(AtomicU32::new(0));
}
vec
}
}
impl Drop for BlockAllocator {
fn drop(&mut self) {
for block in &mut self.blocks {
let vec_ptr = block.load(Acquire) as *mut Vec<AtomicU32>;
if vec_ptr.is_null() {
continue;
}
// Safety: we checked that `vec_ptr` is allocated by checking if it is non null
// and the only way it is non null is if it was allocated by `Self::allocate_current_block`
let new_vec = unsafe { Box::from_raw(vec_ptr) };
drop(new_vec);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
#[test]
fn allocate_single_thread() {
let alloc = BlockAllocator::new();
for _ in 0..64 * 3 {
alloc.allocate(10);
}
}
#[test]
fn allocate_multi_thread() {
let alloc = Arc::new(BlockAllocator::new());
let ths: Vec<_> = (0..3)
.map(|_| {
let alloc = alloc.clone();
std::thread::spawn(move || {
for _ in 0..2 {
alloc.allocate(60);
}
})
})
.collect();
for th in ths {
th.join().unwrap();
}
}
}
And also a playground link