Code review: Using parking_lot::RawMutex

I've been writing some audio processing code recently with JACK, and had some trouble communicating between the main and realtime threads. In particular, I needed the realtime processing thread to be able to hold a Mutex across several callbacks so that the main thread can't accidentally interrupt the audio processing at a bad time.

In order to accomplish this, I wrote something with an API similar to std::sync::Mutex, except that the guard holds an Arc instead of a lifetimed reference. Does my implementation look correct, and is there an existing crate out there that does something similar?

use parking_lot::{RawMutex,lock_api::RawMutex as _};
use std::sync::Arc;
use std::cell::UnsafeCell;
use std::fmt::{Debug,Formatter};
use std::marker::PhantomData as PhD;
use crossbeam::sync::WaitGroup;

struct Inner<T:?Sized> {
    mutex: RawMutex,
    cell: UnsafeCell<(Option<WaitGroup>, T)>,
}

unsafe impl<T:?Sized + Send> Sync for Inner<T> {}
unsafe impl<T:?Sized + Send> Send for Inner<T> {}

/// A shared-ownership mutex that doesn't tie its guards to a
/// particular scope.
///
/// This allows the guards to be stored arbitrarily long, which
/// is useful for realtime processing: During JACK's transport
/// startup phase, the lock can be acquired and continue to be
/// held until the transport is finished.
pub struct UnscopedMutex<T:?Sized>(Arc<Inner<T>>);

impl<T:?Sized> Clone for UnscopedMutex<T> {
    fn clone(&self)->Self {
        UnscopedMutex(Arc::clone(&self.0))
    }
}

impl<T:?Sized> Debug for UnscopedMutex<T> {
    fn fmt(&self, f: &mut Formatter<'_>)->Result<(), std::fmt::Error> {
        f.write_fmt(format_args!("UnscopedMutex<{}>", std::any::type_name::<T>()))
    }
}

/// Exclusive permission to access the contents of an UnscopedMutex.
///
/// The Mutex will be kept alive as long as the guard exists, even if
/// all of the other copies are dropped.
pub struct UnscopedGuard<T:?Sized>(
    Arc<Inner<T>>,
    PhD<<RawMutex as parking_lot::lock_api::RawMutex>::GuardMarker>
);

impl<T:?Sized+Debug> Debug for UnscopedGuard<T> {
    fn fmt(&self, f: &mut Formatter<'_>)->Result<(), std::fmt::Error> {
        (&**self).fmt(f)
    }
}

impl<T:?Sized> UnscopedMutex<T> {
    pub fn new(init:T)->Self where T:Sized {
        UnscopedMutex(Arc::new(Inner {
            mutex: <RawMutex as parking_lot::lock_api::RawMutex>::INIT,
            cell: UnsafeCell::new((None, init))
        }))
    }

    /// Block the current thread until the Mutex is available, and then
    /// return access to its contents.
    pub fn lock(&self)->UnscopedGuard<T> {
        self.0.mutex.lock();
        unsafe { self.build_guard() }
    }

    /// Attempt to acquire the Mutex immediately.  If it is not available,
    /// returns without blocking the thread.
    pub fn try_lock(&self)->Option<UnscopedGuard<T>> {
        match self.0.mutex.try_lock() {
            true => Some(unsafe { self.build_guard() }),
            false => None
        }
    }

    /// Safety: mutex must be locked and unguarded
    unsafe fn build_guard(&self)->UnscopedGuard<T> {
        // Notify waiters that the lock has been taken
        (*self.0.cell.get()).0 = None;

        UnscopedGuard(Arc::clone(&self.0), PhD)
    }
}


impl<T:?Sized> Drop for UnscopedGuard<T> {
    fn drop(&mut self) {
        unsafe { self.0.mutex.unlock(); }
    }
}

impl<T:?Sized> std::ops::Deref for UnscopedGuard<T> {
    type Target = T;
    fn deref(&self)->&T {
        unsafe { & (*self.0.cell.get()).1 }
    }
}

impl<T:?Sized> std::ops::DerefMut for UnscopedGuard<T> {
    fn deref_mut(&mut self)->&mut T {
        unsafe { &mut (*self.0.cell.get()).1 }
    }
}

impl<T:?Sized> UnscopedGuard<T> {
    /// Release a guard and wait until another thread takes the lock.
    ///
    /// This can be used as a kind of condvar, blocking until there is
    /// a chance for some change to have been made.
    pub fn release_wait(this:Self) {
        let wait = WaitGroup::new();
        unsafe { (*this.0.cell.get()).0 = Some(wait.clone()); }
        std::mem::drop(this);
        wait.wait();
    }
}

It’s somewhat poorly documented on the docs.rs page, but lock_api has an arc_lock feature that features API like

impl<R: RawMutex, T: ?Sized> Mutex<R, T>

pub fn lock_arc(self: &Arc<Self>) -> ArcMutexGuard<R, T>

Acquires a lock through an Arc.

This method is similar to the lock method; however, it requires the Mutex to be inside of an Arc and the resulting mutex guard has no lifetime requirements.

6 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.