I have a library like stdlib's MPSC called hopper. The difference with hopper is that it can buffer to disk if the in-memory storage gets too high: unbounded queued elements in bounded memory. The current stable -- for reference -- version functions but isn't the fastest thing on the planet: senders and the receiver compete for a single mutex.
Lately I've been working to bust up this exclusive situation and move toward a system where all the senders synchronize on one end of a deque, the receiver synchronizes on the other. I feel like I'm really close to making this happen but I've worked myself into a corner and I'm kind of stuck. Hopefully someone here'll see a way out I've missed. So, let me walk you through what I've got now. For reference, the commit I'm referencing is 34d40f575761332b2a5dcb76b027f334e5acf4f6.
Okay, the deque that's lock protected on either end has an odd interface. For instance, enquing a T
is push_back(&mut self, elem: T, mut guard: &mut MutexGuard<BackGuardInner<S>>) -> Result<bool, Error<T>>
. There's a lot going on here. elem: T
is the thing we're going to move into the deque. The bool that comes out in the Ok case of the return is a flag for if a stuck receiver needs to be notified -- via a Condvar -- that it's free to start processing again. The elem: T
is moved back to the caller in the case of error; more on that in a second. The implementation of push_back
is fairly small, if, uh, pointerific:
pub unsafe fn push_back(
&self,
elem: T,
guard: &mut MutexGuard<BackGuardInner<S>>,
) -> Result<bool, Error<T>> {
let mut must_wake_dequeuers = false;
if !(*self.data.offset((*guard).offset)).is_null() {
return Err(Error::Full(elem));
} else {
*self.data.offset((*guard).offset) = Box::into_raw(Box::new(elem));
(*guard).offset += 1;
(*guard).offset %= self.capacity as isize;
if self.size.fetch_add(1, Ordering::Release) == 0 {
must_wake_dequeuers = true;
};
}
return Ok(must_wake_dequeuers);
}
Why does a MutexGuard<BackGuardInner<S>>
get passed as a reference to the function? Let's take a look at a part of the main caller of push_back
, hopper::Sender::send
:
pub fn send(&mut self, event: T) {
let mut back_guard = self.mem_buffer.lock_back();
let placed_event = private::Placement::Memory(event);
match self.mem_buffer.push_back(placed_event, &mut back_guard) {
Ok(must_wake_receiver) => {
if must_wake_receiver {
let front_guard = self.mem_buffer.lock_front();
self.mem_buffer.notify_not_empty(&front_guard);
drop(front_guard);
}
}
Err(deque::Error::Full(placed_event)) => {
Because hopper keeps memory bounded it's possible that the send of an event will fail, signaling that we're going to need to do something to get that event to disk. The approach taken is to pop the current back element off the in-memory buffer and fiddle with it based on it's 'placement' status, which, uh, I guess is beside the point for this question but if you want to have a look dig in here or thereabouts. (This is not entirely reflected in-code. Just FYI.)
Anyhow, the trouble I'm in is that pushing and popping require a mutable deque as does getting the appropriate locks in the first place. The compiler is rightfully indignant:
error[E0499]: cannot borrow `self.mem_buffer` as mutable more than once at a time
--> src/sender.rs:193:15
|
191 | let mut back_guard = self.mem_buffer.lock_back();
| --------------- first mutable borrow occurs here
192 | let placed_event = private::Placement::Memory(event);
193 | match self.mem_buffer.push_back(placed_event, &mut back_guard) {
| ^^^^^^^^^^^^^^^ second mutable borrow occurs here
...
242 | }
| - first borrow ends here
error[E0499]: cannot borrow `self.mem_buffer` as mutable more than once at a time
--> src/sender.rs:196:39
|
191 | let mut back_guard = self.mem_buffer.lock_back();
| --------------- first mutable borrow occurs here
...
196 | let front_guard = self.mem_buffer.lock_front();
| ^^^^^^^^^^^^^^^ second mutable borrow occurs here
...
242 | }
| - first borrow ends here
error[E0499]: cannot borrow `self.mem_buffer` as mutable more than once at a time
--> src/sender.rs:197:21
|
191 | let mut back_guard = self.mem_buffer.lock_back();
| --------------- first mutable borrow occurs here
...
197 | self.mem_buffer.notify_not_empty(&front_guard);
| ^^^^^^^^^^^^^^^ second mutable borrow occurs here
...
242 | }
| - first borrow ends here
I considered moving the locks out of the deque entirely but the trouble I'll have there is, well, how would I then implement Queue<T>::pop_front
? This is the function that our condition variable makes an appearance in, the signaler of which is the sender sending. I also considered making the lock functions consume a closure where I'd do the meat of the function operation. But, I think this'll run into problems too: back_lock
has to be held for the duration of Sender::send
, front_lock
only for notifying the Receiver that more writes are ready. That's a double mutation right there.
Anyhow, I've worked myself into a strong corner for sure. Anyone have an idea how I could work myself out of it?
--
TL;DR: I have a structure that holds mutexes to protect it and data functions on the same which take MutexGuards borrowed from the structure to mutate the structure. It's a nasty loop that I'm not sure how to break.