I have the following implementation of a heapless and lockless, multiple producer, multiple consumer thread-safe queue. The implementation has considered the ABA problem.
I kwow that the SeqCst
memory order is not really needed, but it is used here to simplify the analysis.
My first question is that why my unit test may randomly fail.
I fill the queue to full at the beginning.
And I use a struct to guard the dequeue and enqueue of the Queue.
I expect enqueue failure never happens, which is not true.
And the other question is that what is the correct weakest memory order,
for each atomic operation?
My environment is x86_64 linux.
Thanks a lot
use atomic::Ordering;
use core::sync::atomic::{self, AtomicUsize};
use core::{cell::UnsafeCell, mem::MaybeUninit};
pub struct MpMcQueue<T, const N: usize> {
buffer: [Cell<T>; N],
head: AtomicUsize,
tail: AtomicUsize,
}
impl<T, const N: usize> MpMcQueue<T, N> {
const EMPTY_CELL: Cell<T> = Cell::new(0);
/// Creates an empty queue
#[must_use]
pub const fn new() -> Self {
let mut cell_count = 0;
let mut result_cells: [Cell<T>; N] = [Self::EMPTY_CELL; N];
while cell_count != N {
result_cells[cell_count] = Cell::new(cell_count);
cell_count += 1;
}
Self {
buffer: result_cells,
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
}
}
pub fn dequeue(&self) -> Option<T> {
let mut pos;
let mut cell;
loop {
pos = self.head.load(Ordering::SeqCst);
cell = &self.buffer[pos % N];
let seq = cell.sequence.load(Ordering::SeqCst);
match seq.cmp(&pos.wrapping_add(1)) {
core::cmp::Ordering::Equal => {
match self.head.compare_exchange(
pos,
pos.wrapping_add(1),
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
let data = unsafe { (*cell.data.get()).assume_init_read() };
cell.sequence.store(pos.wrapping_add(N), Ordering::SeqCst);
return Some(data);
}
Err(_) => (),
}
}
core::cmp::Ordering::Less => {
return None;
}
core::cmp::Ordering::Greater => {}
}
}
}
pub fn enqueue(&self, item: T) -> Result<(), T> {
let mut pos;
let mut cell;
loop {
pos = self.tail.load(Ordering::SeqCst);
cell = &self.buffer[pos % N];
let seq = cell.sequence.load(Ordering::SeqCst);
match seq.cmp(&pos) {
core::cmp::Ordering::Equal => {
match self.tail.compare_exchange(
pos,
pos.wrapping_add(1),
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
unsafe { (*cell.data.get()).write(item) };
cell.sequence.store(pos.wrapping_add(1), Ordering::SeqCst);
return Ok(());
}
Err(_) => core::hint::spin_loop(),
}
}
core::cmp::Ordering::Less => {
return Err(item);
}
core::cmp::Ordering::Greater => {}
}
}
}
}
unsafe impl<T: Send, const N: usize> Send for MpMcQueue<T, N> {}
unsafe impl<T: Send, const N: usize> Sync for MpMcQueue<T, N> {}
impl<T, const N: usize> Drop for MpMcQueue<T, N> {
fn drop(&mut self) {
while self.dequeue().is_some() {}
}
}
struct Cell<T> {
data: UnsafeCell<MaybeUninit<T>>,
sequence: AtomicUsize,
}
impl<T> Cell<T> {
const fn new(seq: usize) -> Self {
Self {
data: UnsafeCell::new(MaybeUninit::uninit()),
sequence: AtomicUsize::new(seq),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn stress() {
const THREADS: usize = 16;
const ELEMENTS: usize = 1000;
const CAPACITY: usize = 4;
static QUEUE: MpMcQueue<usize, CAPACITY> = MpMcQueue::<usize, CAPACITY>::new();
// Make the queue full at the beginning
for i in 0..CAPACITY {
QUEUE.enqueue(i).unwrap();
}
struct Foo {
value: usize,
reference: &'static MpMcQueue<usize, CAPACITY>,
}
impl Foo {
fn get() -> Option<Self> {
QUEUE.dequeue().map(|value| Self {
value,
reference: &QUEUE,
})
}
}
impl Drop for Foo {
fn drop(&mut self) {
// Expect panic never happen
self.reference
.enqueue(self.value)
.expect("Should not fail here");
}
}
std::thread::scope(|s| {
for _ in 0..THREADS {
s.spawn(|| {
for _ in 0..ELEMENTS {
Foo::get();
}
});
}
});
}
}