I tried to implement a cocurrently-shared queue support multi-readr and multi-writer, with the a Mutex
wrapped VecDeque
, a Condvar
for readers and one for writers. However, after sometimes they stuck, and I cannot figure out why.
core struct:
struct Queue {
queue: Mutex<VecDeque<i32>>,
capacity: usize,
writer_cond: Condvar,
reader_cond: Condvar,
}
impl Queue {
fn new(capacity: usize) -> Self {
Queue {
queue: Mutex::new(VecDeque::new()),
capacity: capacity,
writer_cond: Condvar::new(),
reader_cond: Condvar::new(),
}
}
fn produce(&self, i: i32) {
let guard = self.queue.lock().unwrap();
let mut buffer = self
.writer_cond
.wait_while(guard, |q| q.len() == self.capacity)
.unwrap();
buffer.push_back(i);
drop(buffer);
self.reader_cond.notify_one();
}
fn consume(&self) -> i32 {
let guard = self.queue.lock().unwrap();
let mut buffer = self
.writer_cond
.wait_while(guard, |q| q.is_empty())
.unwrap();
let result = buffer.pop_front();
drop(buffer);
self.writer_cond.notify_one();
result.unwrap()
}
}