Does this synchronization mechanism look okay for an SPSC queue?


#1

https://play.rust-lang.org/?gist=cf3c7e1e76280389812cdbefe3a417e3&version=stable

SPSC queue sharing a VecDeque.

The sender needs to notify a blocked receiver on send and drop:

fn send (&self, t : T) -> Result <(), SendError <T>> {
  if self.inner.connected.load (std::sync::atomic::Ordering::SeqCst) {
    self.inner.queue.lock().unwrap().push_back (t);
    self.inner.counter.fetch_add (1, std::sync::atomic::Ordering::SeqCst);
    {
      let _wait_lock = self.inner.wait_lock.lock().unwrap();
      self.inner.wait_cvar.notify_one();
    }
    Ok (())
  } else {
    Err (SendError (t))
  }
}

...

fn drop (&mut self) {
  self.inner.connected.store (false, std::sync::atomic::Ordering::SeqCst);
  {
    let _wait_lock = self.inner.wait_lock.lock().unwrap();
    self.inner.wait_cvar.notify_one();
  }
}

and the blocking recv function:

fn recv (&self) -> Result <T, RecvError> {
  match self.try_recv() {
    Ok (t) => Ok (t),
    Err (TryRecvError::Empty) => {
      {
        let mut _wait_lock = self.inner.wait_lock.lock().unwrap();
        _wait_lock = self.inner.wait_cvar.wait (_wait_lock).unwrap();
      }
      match self.try_recv() {
        Ok (t) => Ok (t),
        Err (TryRecvError::Empty) => unreachable!(
          "blocked receiver was notified, there should either be a pending \
            message or else the channel was closed"),
        Err (TryRecvError::Disconnected) => Err (RecvError)
      }
    },
    Err (TryRecvError::Disconnected) => {
      Err (RecvError)
    }
  }
}

The stream (spsc) “flavor” of the standard library mpsc channels is a bit more involved, using transmutes to store an Arc (Thread, AtomicBool) in an AtomicUsize field, and some extra counters, like the “steals” counter. I figured a simpler synchronization mechanism would be possible, but I don’t know if the above is too simple.

At first I had a boolean in the mutex, but I couldn’t figure out the correct logic. Switching to a Mutex<()> still works, but now I’m calling notify_one on every send operation, which may be wasteful if the receiver is not waiting?

Edit

Apparently this attempt at synchronization is not good enough. Performing some benchmarks, after running about 30 times eventually hit the “unreachable” code path.


#2

I think this works, although the scope of the lock has grown to cover most of the send/recv functions:

https://play.rust-lang.org/?gist=6b803e4a2c01463b9377d224c06d300f&version=undefined

non-blocking send:

fn send (&self, t : T) -> Result <(), SendError <T>> {
  if self.inner.connected.load (std::sync::atomic::Ordering::SeqCst) {
    // wait lock acquire
    let wait_lock = self.inner.wait_lock.lock().unwrap();
    self.inner.queue.lock().unwrap().push_back (t);
    let _count
      = self.inner.counter.fetch_add (1, std::sync::atomic::Ordering::SeqCst);
    if *wait_lock {
      self.inner.wait_cvar.notify_one();
    }
    // wait lock release
    Ok (())
  } else {
    Err (SendError (t))
  }
}

blocking recv:

fn recv (&self) -> Result <T, RecvError> {
  // wait lock acquire
  let mut wait_lock = self.inner.wait_lock.lock().unwrap();
  match self.try_recv() {
    Ok (t) => Ok (t),
    Err (TryRecvError::Empty) => {
      *wait_lock = true;
      wait_lock = self.inner.wait_cvar.wait (wait_lock).unwrap();
      *wait_lock = false;
      { // wait lock early release
        let _ = wait_lock;
      }

      match self.try_recv() {
        Ok (t) => Ok (t),
        Err (TryRecvError::Empty) => unreachable!(
          "blocked receiver was notified, there should either be a pending \
            message or else the channel was closed"),
        Err (TryRecvError::Disconnected) => Err (RecvError)
      }
    },
    Err (TryRecvError::Disconnected) => {
      Err (RecvError)
    }
  }
  // wait lock release
}

#3

Note that this function is susceptible to spurious wakeups.
https://doc.rust-lang.org/std/sync/struct.Condvar.html#method.wait


#4

Oh okay that must be why the wait call in the example was inside a while loop, I was wondering why it wasn’t just an “if” statement.

Park and unpark does seem more natural (it doesn’t have the problem of spurious wake ups?), but there’s the problem of getting the thread handle to the other thread to call unpark, which is what the standard library mpsc implementation does through transmuting an Arc to an AtomicUsize. Not sure I want to go there…


#5

The thread::park function blocks the current thread unless or until the token is available for its thread handle, at which point it atomically consumes the token. It may also return spuriously, without consuming the token.
https://doc.rust-lang.org/std/thread/fn.park.html