SPSC queue sharing a VecDeque.
The sender needs to notify a blocked receiver on send and drop:
fn send (&self, t : T) -> Result <(), SendError <T>> {
if self.inner.connected.load (std::sync::atomic::Ordering::SeqCst) {
self.inner.queue.lock().unwrap().push_back (t);
self.inner.counter.fetch_add (1, std::sync::atomic::Ordering::SeqCst);
{
let _wait_lock = self.inner.wait_lock.lock().unwrap();
self.inner.wait_cvar.notify_one();
}
Ok (())
} else {
Err (SendError (t))
}
}
...
fn drop (&mut self) {
self.inner.connected.store (false, std::sync::atomic::Ordering::SeqCst);
{
let _wait_lock = self.inner.wait_lock.lock().unwrap();
self.inner.wait_cvar.notify_one();
}
}
and the blocking recv function:
fn recv (&self) -> Result <T, RecvError> {
match self.try_recv() {
Ok (t) => Ok (t),
Err (TryRecvError::Empty) => {
{
let mut _wait_lock = self.inner.wait_lock.lock().unwrap();
_wait_lock = self.inner.wait_cvar.wait (_wait_lock).unwrap();
}
match self.try_recv() {
Ok (t) => Ok (t),
Err (TryRecvError::Empty) => unreachable!(
"blocked receiver was notified, there should either be a pending \
message or else the channel was closed"),
Err (TryRecvError::Disconnected) => Err (RecvError)
}
},
Err (TryRecvError::Disconnected) => {
Err (RecvError)
}
}
}
The stream (spsc) "flavor" of the standard library mpsc channels is a bit more involved, using transmutes to store an Arc (Thread, AtomicBool)
in an AtomicUsize field, and some extra counters, like the "steals" counter. I figured a simpler synchronization mechanism would be possible, but I don't know if the above is too simple.
At first I had a boolean in the mutex, but I couldn't figure out the correct logic. Switching to a Mutex<()>
still works, but now I'm calling notify_one
on every send operation, which may be wasteful if the receiver is not waiting?
Edit
Apparently this attempt at synchronization is not good enough. Performing some benchmarks, after running about 30 times eventually hit the "unreachable" code path.