I've coded the following thread-safe arena for fun and to practice atomics (thanks @Mara for the excellent book!). I know there is a bug, because if I set RANGE
to a very large number (I recommend 10_000_000) it sometimes fail with STATUS_ACCESS_VIOLATION
(on Windows), but I can't spot it and I have no idea how to debug that:
#![forbid(unsafe_op_in_unsafe_fn, rust_2018_idioms)]
use std::alloc::Layout;
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicPtr, Ordering};
/// A non-dropping thread-safe arena.
pub struct Arena<T> {
current_chunk: AtomicPtr<T>,
current_chunk_end: AtomicPtr<T>,
/// `null` if we are currently allocating a new chunk.
next_to_alloc: AtomicPtr<T>,
/// The number of `T`s every chunk can contain.
capacity: usize,
allocated_chunks: UnsafeCell<Vec<*mut T>>,
}
// SAFETY: This is the point.
unsafe impl<T: Send> Sync for Arena<T> {}
const DEFAULT_CAPACITY: usize = 1000;
impl<T> Arena<T> {
const EMPTY_ARENA_SENTINEL: *mut T = 1 as *mut T;
#[inline]
pub const fn new() -> Self {
Self::with_capacity(DEFAULT_CAPACITY)
}
#[inline]
#[track_caller]
/// Takes the number of `T`s a chunk will contain.
pub const fn with_capacity(capacity: usize) -> Self {
// This is not required, but put here for better error reporting. This is equivalent to
// `Layout::array::<T>(capacity).unwrap()`, but const.
match capacity.checked_mul(std::mem::size_of::<T>()) {
Some(v) if v > isize::MAX as usize => panic!("too big capacity"),
None => panic!("too big capacity"),
_ => {}
}
assert!(capacity > 0, "capacity cannot be zero");
assert!(
std::mem::size_of::<T>() != 0,
"zero-sized types are not supported",
);
Self {
current_chunk: AtomicPtr::new(Self::EMPTY_ARENA_SENTINEL),
current_chunk_end: AtomicPtr::new(Self::EMPTY_ARENA_SENTINEL),
next_to_alloc: AtomicPtr::new(Self::EMPTY_ARENA_SENTINEL),
capacity,
allocated_chunks: UnsafeCell::new(Vec::new()),
}
}
fn alloc_new_chunk(capacity: usize) -> (*mut T, *mut T) {
let layout = Layout::array::<T>(capacity).expect("too big capacity");
// SAFETY: We verified in `with_capacity()` that the capacity is not zero and the type is not a ZST.
let chunk = unsafe { std::alloc::alloc(layout) }.cast::<T>();
if chunk.is_null() {
// We cannot call `handle_alloc_error()` because it can cause a recoverable panic but we are left
// in invalid state as the current chunk is duplicated on `current_chunk` and `allocated_chunks`.
// We could recover to a valid state, this is a matter of putting a guard in the right place, but
// I didn't bother as this is not production code.
std::process::abort();
}
// Shouldn't happen, but just to be extra safe.
assert_ne!(
chunk,
Self::EMPTY_ARENA_SENTINEL,
"got allocation at address {}??",
Self::EMPTY_ARENA_SENTINEL as usize,
);
// SAFETY: It is one past the end.
let chunk_end = unsafe { chunk.add(capacity) };
(chunk, chunk_end)
}
#[inline]
fn alloc_place(&self) -> *mut T {
let mut next_to_alloc = self.next_to_alloc.load(Ordering::Acquire);
// We can get an incorrect `chunk_end` during allocation of a new chunk. However,
// if this happens `next_to_alloc` will be null, so we will spin loop until the
// chunk has been fully allocated then update `chunk_end`.
let mut chunk_end = self.current_chunk_end.load(Ordering::Relaxed);
loop {
if next_to_alloc.is_null() {
// Somebody is allocating a new chunk, wait for them.
loop {
std::hint::spin_loop();
next_to_alloc = self.next_to_alloc.load(Ordering::Relaxed);
if !next_to_alloc.is_null() {
break;
}
}
// fence(Ordering::Acquire);
// For thread sanitizer, as it doesn't support fences.
self.next_to_alloc.load(Ordering::Acquire);
chunk_end = self.current_chunk_end.load(Ordering::Relaxed);
}
// This must come after the previous check, so that if we were in the middle of allocating
// a new chunk but it quickly ran out of space before we had a chance to use it we will
// allocate a new chunk.
if next_to_alloc == chunk_end {
return self.grow_by_chunk(next_to_alloc);
}
// SAFETY: This is either in bounds or one past the end.
let after = unsafe { next_to_alloc.add(1) };
// `Ordering::Relaxed` is not enough for the success ordering, we need `Release`. This is
// because we need to form a happens-before relationship between all updates of `next_to_alloc`
// and the load of it as part of the `compare_exchange()` in `grow_by_chunk()`. If the
// stores will not happen-before this load, we may observe a corrupted `chunk_end`
// in `alloc_place()` because of later `grow_by_chunk()`.
match self.next_to_alloc.compare_exchange_weak(
next_to_alloc,
after,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => return next_to_alloc,
Err(v) => next_to_alloc = v,
}
}
}
// Allocates a new chunk and does all bookkeeping.
#[cold]
#[inline(never)]
fn grow_by_chunk(&self, expected_next_to_alloc: *mut T) -> *mut T {
if self
.next_to_alloc
.compare_exchange(
expected_next_to_alloc,
std::ptr::null_mut(),
Ordering::Acquire,
Ordering::Relaxed,
)
.is_err()
{
// If the chunk is not equal to the chunk end anymore, that means someone was faster than
// us and already took care to allocate a new chunk. So let's wait for it to finish
// (`alloc_place()` will do that) and retry allocating (it cannot be someone who further
// increased `next_to_alloc` because once we reached the chunk end we don't advance anymore).
return self.alloc_place();
}
let (chunk, chunk_end) = Self::alloc_new_chunk(self.capacity);
let result = chunk;
// SAFETY: We verify in `with_capacity` that the capacity is at least one, so this is either
// in bounds or one past the bounds.
let chunk_rest = unsafe { chunk.add(1) };
let old_chunk = self.current_chunk.load(Ordering::Relaxed);
if old_chunk != Self::EMPTY_ARENA_SENTINEL {
// SAFETY: We're holding a "lock" as `next_to_alloc` is null and we have exclusive access
// to `allocated_chunks` during this time period.
unsafe {
(*self.allocated_chunks.get()).push(old_chunk);
}
}
self.current_chunk.store(chunk, Ordering::Relaxed);
self.current_chunk_end.store(chunk_end, Ordering::Relaxed);
self.next_to_alloc.store(chunk_rest, Ordering::Release);
result
}
#[inline]
pub fn alloc(&self, v: T) -> &mut T {
let place = self.alloc_place();
// SAFETY: `alloc_place()` always gives us a valid pointer.
unsafe {
place.write(v);
&mut *place
}
}
}
impl<T> Drop for Arena<T> {
fn drop(&mut self) {
let layout = Layout::array::<T>(self.capacity).unwrap();
self.allocated_chunks
.get_mut()
.iter()
.copied()
.chain(
Some(*self.current_chunk.get_mut())
.filter(|&chunk| chunk != Self::EMPTY_ARENA_SENTINEL),
)
.for_each(|chunk| {
// SAFETY: We allocated each chunk with the global allocator and `Layout::array::<T>(self.capacity)`.
unsafe {
std::alloc::dealloc(chunk.cast::<u8>(), layout);
}
});
}
}
fn main() {
let arena = Arena::new();
dbg!(arena.alloc(123u64));
let arena = &arena;
let counter = &std::sync::atomic::AtomicU64::new(0);
const RANGE: u64 = 10_000_000;
std::thread::scope(|s| {
for i in 0..10 {
s.spawn(move || {
counter.fetch_add(
(i * RANGE..(i * RANGE) + RANGE)
.map(|j| arena.alloc(j))
.collect::<Vec<_>>()
.into_iter()
.map(|&mut v| v)
.sum::<u64>(),
Ordering::Relaxed,
)
});
}
});
dbg!(counter.load(Ordering::Relaxed));
}
I also tried to replace all orderings with SeqCst
but it still sometimes crashes, so this is probably not a problem with the ordering.
In addition, I will also appreciate any bug report/other comments.