I've been writing some audio processing code recently with JACK, and had some trouble communicating between the main and realtime threads. In particular, I needed the realtime processing thread to be able to hold a Mutex
across several callbacks so that the main thread can't accidentally interrupt the audio processing at a bad time.
In order to accomplish this, I wrote something with an API similar to std::sync::Mutex
, except that the guard holds an Arc
instead of a lifetimed reference. Does my implementation look correct, and is there an existing crate out there that does something similar?
use parking_lot::{RawMutex,lock_api::RawMutex as _};
use std::sync::Arc;
use std::cell::UnsafeCell;
use std::fmt::{Debug,Formatter};
use std::marker::PhantomData as PhD;
use crossbeam::sync::WaitGroup;
struct Inner<T:?Sized> {
mutex: RawMutex,
cell: UnsafeCell<(Option<WaitGroup>, T)>,
}
unsafe impl<T:?Sized + Send> Sync for Inner<T> {}
unsafe impl<T:?Sized + Send> Send for Inner<T> {}
/// A shared-ownership mutex that doesn't tie its guards to a
/// particular scope.
///
/// This allows the guards to be stored arbitrarily long, which
/// is useful for realtime processing: During JACK's transport
/// startup phase, the lock can be acquired and continue to be
/// held until the transport is finished.
pub struct UnscopedMutex<T:?Sized>(Arc<Inner<T>>);
impl<T:?Sized> Clone for UnscopedMutex<T> {
fn clone(&self)->Self {
UnscopedMutex(Arc::clone(&self.0))
}
}
impl<T:?Sized> Debug for UnscopedMutex<T> {
fn fmt(&self, f: &mut Formatter<'_>)->Result<(), std::fmt::Error> {
f.write_fmt(format_args!("UnscopedMutex<{}>", std::any::type_name::<T>()))
}
}
/// Exclusive permission to access the contents of an UnscopedMutex.
///
/// The Mutex will be kept alive as long as the guard exists, even if
/// all of the other copies are dropped.
pub struct UnscopedGuard<T:?Sized>(
Arc<Inner<T>>,
PhD<<RawMutex as parking_lot::lock_api::RawMutex>::GuardMarker>
);
impl<T:?Sized+Debug> Debug for UnscopedGuard<T> {
fn fmt(&self, f: &mut Formatter<'_>)->Result<(), std::fmt::Error> {
(&**self).fmt(f)
}
}
impl<T:?Sized> UnscopedMutex<T> {
pub fn new(init:T)->Self where T:Sized {
UnscopedMutex(Arc::new(Inner {
mutex: <RawMutex as parking_lot::lock_api::RawMutex>::INIT,
cell: UnsafeCell::new((None, init))
}))
}
/// Block the current thread until the Mutex is available, and then
/// return access to its contents.
pub fn lock(&self)->UnscopedGuard<T> {
self.0.mutex.lock();
unsafe { self.build_guard() }
}
/// Attempt to acquire the Mutex immediately. If it is not available,
/// returns without blocking the thread.
pub fn try_lock(&self)->Option<UnscopedGuard<T>> {
match self.0.mutex.try_lock() {
true => Some(unsafe { self.build_guard() }),
false => None
}
}
/// Safety: mutex must be locked and unguarded
unsafe fn build_guard(&self)->UnscopedGuard<T> {
// Notify waiters that the lock has been taken
(*self.0.cell.get()).0 = None;
UnscopedGuard(Arc::clone(&self.0), PhD)
}
}
impl<T:?Sized> Drop for UnscopedGuard<T> {
fn drop(&mut self) {
unsafe { self.0.mutex.unlock(); }
}
}
impl<T:?Sized> std::ops::Deref for UnscopedGuard<T> {
type Target = T;
fn deref(&self)->&T {
unsafe { & (*self.0.cell.get()).1 }
}
}
impl<T:?Sized> std::ops::DerefMut for UnscopedGuard<T> {
fn deref_mut(&mut self)->&mut T {
unsafe { &mut (*self.0.cell.get()).1 }
}
}
impl<T:?Sized> UnscopedGuard<T> {
/// Release a guard and wait until another thread takes the lock.
///
/// This can be used as a kind of condvar, blocking until there is
/// a chance for some change to have been made.
pub fn release_wait(this:Self) {
let wait = WaitGroup::new();
unsafe { (*this.0.cell.get()).0 = Some(wait.clone()); }
std::mem::drop(this);
wait.wait();
}
}