Semaphore=Mutex+CondVar (Concurrency wizards needed)

Introduction

Semaphores are a vital concurrency device. It is known that you can build one with a mutex and a conditional variable. I am attempting to do just that for learning purposes.

Here I wanted to get some validation of my solution.

Objectives

  • Is my implementation correct? Why not?
  • What is missing for it to be production ready¹?
    • As I say on the docs of the snippet, the solution is not "fair". What's your take here?
  • Any other comments you find relevant?

Please, don't hold back.

¹: I know I do not take into account overflow of the counters. Let's pretend this is not an issue.

Attempt 1 - Incorrect IMO

I use this first attempt to set some context, and justify the next approach.

Attempt-1, Wrong
use std::sync::{Mutex, Condvar};
use std::ops::{Deref, DerefMut};

/// This implementation is not 100% correct. It does not account for
///
/// 1. Spurious wakes.
///
pub struct Semaphore {
    mutex: Mutex<i16>,
    cond_var: Condvar,
}

impl Semaphore {
    pub fn new(size: u16) -> Semaphore {
        if size == 0 {
            panic!("Semaphore size must be greater than 0.")
        }
        Semaphore {
            mutex: Mutex::new(size as i16),
            cond_var: Condvar::new(),
        }
    }

    pub fn decrement(&self) {
        let mut mutex_guard = self.mutex.lock().unwrap();
        let counter = mutex_guard.deref_mut();
        *counter -= 1;

        if *mutex_guard.deref() < 0 {
            mutex_guard = self.cond_var.wait(mutex_guard).unwrap();
        }
    }

    pub fn increment(&self) {
        let mut mutex_guard = self.mutex.lock().unwrap();
        let counter = mutex_guard.deref_mut();
        *counter += 1;

        if *mutex_guard.deref() <= 0 {
            self.cond_var.notify_one();
        }
    }
}

As I see it, this implementation suffers from:

Spurious wakes

The call self.cond_var.wait(mutex_guard).unwrap() on the decrement method can return because of a spurious wake, which might lead to more threads inside the semaphore than expected.

Final - Where I want your criticism

Is this solution production ready? (playground link below)
use std::sync::{Mutex, Condvar};
use std::ops::{Deref, DerefMut};

/// State protected by the mutex that controls the behaviour of the semaphore.
///
///
///
/// ## Parameters
///
/// `passes`: One pass represents a sort of permit to enter the semaphore. It is not possible
///  for this parameter to be negative (i.e., < 0).
///
/// `counter`: Represents the number of available spots in the Semaphore. It takes into account
/// the number of passes "not redeemed".
/// If `counter` is greater than 0, then it represents how many threads would be able to enter
/// the semaphore without blocking. Additional threads might also be able to enter, depending on
/// the value of `passes`.
/// A `counter` of 0 means any new thread will block when trying to enter, and no threads are
/// waiting to enter *without* a pass. But there might be threads waiting to enter with `passes`.
/// If `counter` is lower than 0, then it represents how many threads are already blocked waiting
/// to enter. There might also be additional threads blocked waiting to enter, which are represented
/// by a positive value of `passes`.
///
///
struct State {
    counter: i16,
    passes: u16,
}

impl State {
    fn new(counter: i16, passes: u16) -> State {
        State {
            counter,
            passes,
        }
    }
}

pub struct Semaphore {
    mutex: Mutex<State>,
    cond_var: Condvar,
}

/// On fairness
/// A thing that happens, although that does not feel to be a problem is a permit being
/// stolen by a new thread. That is, there are threads waiting, when a spot appears on the semaphore
/// because of a thread leaving. You might think that some of the CURRENTLY waiting threads will
/// enter the semaphore; not necessarily. A new thread, not currently waiting, might win contention
/// for the lock.
///
/// Initial                  X,X|X,X,X : counter=-2,passes=0
/// One thread leaves        X,X|O,X,X : counter=-1,passes=1
/// A NEW thread enters      X,X|X,X,X : counter=-2,passes=0
///
impl Semaphore {
    pub fn new(size: u16) -> Semaphore {
        if size == 0 {
            panic!("Semaphore size must be greater than 0.")
        }
        Semaphore {
            mutex: Mutex::new(State::new(size as i16, 0)),
            cond_var: Condvar::new(),
        }
    }

    pub fn decrement(&self) {
        let mut mutex_guard = self.mutex.lock().unwrap();
        let State { counter, .. } = mutex_guard.deref_mut();
        *counter -= 1;

        if *counter < 0 {
            while (*mutex_guard.deref()).passes == 0 { // now it is guarded against spurious wakes
                mutex_guard = self.cond_var.wait(mutex_guard).unwrap();
            }
            let State { passes, .. } = mutex_guard.deref_mut();
            *passes -= 1;
        }
    }

    pub fn increment(&self) {
        let mut mutex_guard = self.mutex.lock().unwrap();
        let State { counter, passes } = mutex_guard.deref_mut();
        *counter += 1;

        if *counter <= 0 {
            *passes += 1;
            self.cond_var.notify_one();
        }
    }
}

I think the rust docs on the code block are sufficient and don't require further explanation.

In particular notice:

  • The introduction of variable passes which was ultimately needed to solve the spurious wake problem.

Playground link: Rust Playground

Intuitively, I would’ve done something like

use std::sync::{Condvar, Mutex};

pub struct Semaphore {
    mutex: Mutex<usize>,
    cond_var: Condvar,
}

impl Semaphore {
    pub fn new(initial_size: usize) -> Semaphore {
        Semaphore {
            mutex: Mutex::new(initial_size),
            cond_var: Condvar::new(),
        }
    }

    pub fn decrement(&self) {
        let guard = self.mutex.lock().unwrap();
        let mut guard = self.cond_var.wait_while(guard, |c| *c == 0).unwrap();

        *guard -= 1;
    }

    pub fn increment(&self) {
        let mut guard = self.mutex.lock().unwrap();
        *guard = guard.checked_add(1).unwrap();

        self.cond_var.notify_one();
    }
}

but I don’t know whether or not that’s correct.

I haven’t fully understood your final solution yet.

2 Likes

I came up with roughly the same solution as @steffahn, except that I use a guard object to manage the counts. This ensures that every decrement is paired with an increment:

use std::sync::{Mutex,Condvar};

pub struct Semaphore<T:?Sized> {
    available: Mutex<usize>,
    condvar: Condvar,
    guarded: T,
}

pub struct Permit<'a, T:?Sized>(&'a Semaphore<T>);

impl<T:?Sized> Semaphore<T> {
    pub fn new(limit:usize, guarded:T)->Self where T:Sized {
        Semaphore {
            available: Mutex::new(limit),
            condvar: Condvar::new(),
            guarded
        }
    }
    
    pub fn borrow(&self)->Permit<'_,T> {
        let mut guard = self.available.lock().unwrap();
        while *guard == 0 {
            guard = self.condvar.wait(guard).unwrap();
        }
        *guard -= 1;
        Permit(self)
    }
}

impl<T:?Sized> Drop for Permit<'_,T> {
    fn drop(&mut self) {
        *(self.0.available.lock().unwrap()) += 1;
        self.0.condvar.notify_one();
    }
}

impl<T:?Sized> std::ops::Deref for Permit<'_,T> {
    type Target=T;
    fn deref(&self)->&T {
        &self.0.guarded
    }
}
1 Like

Well, the wait_while API exists for a reason… you don’t have to write such loop yourself.

In my mind, it feels more appropriate to send out the notify_one while still holding on to the lock. I’m not sure however, if there’s really any negative consequences from releasing the mutex first and then sending the notify, besides the possibility of causing some additional spurious wake-ups.

1 Like

Rust ≤ 1.8 apparently had an unstable Semaphore type in the standard library, implemented this way.

In terms of API design, there’s also the precedent of the (async) Semaphore of tokio.

1 Like

Alternatively, you can be generic over the permit type and keep an actual queue in the mutex. If the permit type is zero-sized, this will behave like a classic semaphore, with VecDeque.len() serving as the semaphore count.

use std::sync::{Mutex,Condvar};
use std::collections::VecDeque;

pub struct ResourcePool<T> {
    condvar: Condvar,
    queue: Mutex<VecDeque<T>>,
}

pub struct Permit<'a, T> {
    pool: &'a ResourcePool<T>,
    item: Option<T>
}

impl<T> ResourcePool<T> {
    pub fn new(resources: impl IntoIterator<Item=T>)->Self {
        ResourcePool {
            queue: Mutex::new(VecDeque::from_iter(resources)),
            condvar: Condvar::new(),
        }
    }
    
    pub fn borrow(&self)->Permit<'_,T> {
        let item = self.condvar.wait_while(
            self.queue.lock().unwrap(),
            |queue| queue.len() == 0
        ).unwrap().pop_front();
        Permit { item, pool: self }
    }
    
    pub fn insert(&self, item:T) {
        let mut guard = self.queue.lock().unwrap();
        guard.push_back(item);
        self.condvar.notify_one();
    }
}

impl<T> Drop for Permit<'_,T> {
    fn drop(&mut self) {
        if let Some(item) = self.item.take() {
            self.pool.insert(item);
        }
    }
}

impl<T> Permit<'_,T> {
    /// Permanently remove this resource from the pool
    pub fn steal(mut this: Self)->T {
        this.item.take().unwrap()
    }
}

impl<T> std::ops::Deref for Permit<'_,T> {
    type Target=T;
    fn deref(&self)->&T {
        self.item.as_ref().unwrap()
    }
}

impl<T> std::ops::DerefMut for Permit<'_,T> {
    fn deref_mut(&mut self)->&mut T {
        self.item.as_mut().unwrap()
    }
}

(untested)

1 Like

Damn it. Your solution is more elegant.
On decrement, you wait before decrementing. Smart. With that, there is no need for my extra passes variable. Well. Thanks.

Thanks also for the link to Rust's previous Semaphore implementation. :pray:

Slightly off-topic, whats' your preferred concurrency books/resources, if you have any?

Sorry, I don’t have any resources for you. For answering your question, I’ve worked with knowledge from reading the documentation of Rust’s synchronization primitives (Mutex, Condvar), and maybe some knowledge from university courses I’ve attended a few years ago played a role. I’ve also used a Condvar before in toy examples in Rust; I think I’ve used a C++ condition_variable before, too, in a university project, now that I’m thinking about it… and the relevant university courses I mentioned used wait+notify in Java, IIRC. (I don’t remember any details, but we might have even covered how to implement a semaphore in particular using those primitives in Java. Which is a different kind of problem though, I suppose – I don’t remember what the equivalent for the “Mutex” would’ve been.)

FYI in Java each object has a lock that you can lock with the synchronized keyword.

1 Like

Indeed. synchronized blocks, or intrinsic locks as they are normally referred to, are Java's version of a mutex.

That's ok @steffahn . I asked since I am always keen on finding new concurrency resources.

1 Like

Right, I do remember the monitors. I just don't remember whether or not they were involved in a semaphore implementation. (If we even did a semaphore implementation... I'm still not 100% on that one.)

Oh... they had a block form? Thanks for pointing that out. I somehow just remembered synchronized methods. :slight_smile: Well synchronized blocks were probably involved then, after all. - That kinda feels right/consistent with my vague memory.

Edit: In retrospect, forgetting about the blocks is a bit stupid, because I did remember that arbitrary objects could be used as monitors, and synchronized methods don't exactly allow you to arbitrarily choose the object to be used, do they?

As @SkiFire13 said, the synchronized keyword is present on every java object. That is the core concept.
Methods which have the synchronized keyword are syntactic sugar for calling the synchronized method on the entire object instance of the class of said method.
There are other "syntactic sugars", but ultimately, all map to some object.

Oh, I looked some things up now, turns out wait in Java only works inside of synchronized blocks/methods (for the same object) anyways. So they really are the same thing as condition variables. (Except, AFAICT, that they only make for a single condition variable per mutex.)

:thinking: so is this becoming a complete description of Java's monitors system? Each object includes: a pair consisting of one reentreant mutex, and one condition variable operating on that mutex...

Edit: Wikipedia seems to confirm...“A monitor consists of a mutex (lock) object and condition variables.”

New findings.

On section 31.7 of the book Operating Systems: Three Easy Pieces, there is a semaphore implementation. It is in C, but it is equivalent to the approach posted by @steffahn (and the one he linked of Rust's now deprecated official implementation).

On that section, the author also states that:

  1. It matches the current Linux implementation.
  2. It is not strictly the definition put forward by Dijkstra, because the counter should be able to be negative and reflect the number of waiting threads. (although this has no real consequence as far as I can tell)

I would argue then, that the implementation that I posted is one that follows Dijkstra definition. Although, it is more convoluted than @steffahn's.

I am still looking for someone keen on reviewing my implementation!

1 Like

In college I have always been taught semaphores are bad. In the computer industry I have used Monitors many times (Java language), but never Semaphores. I believe it is hard to guarantee correctness with such an unconstrained API.

Monitors and condition variables

Usually it is desirable to separate specification concerns from implementation concerns. Your introduction of 'passes' might be not needed in a Monitor setting because spurious wakes are less of an issue. At least I never recalled having to care about spurious wakes with monitors.

One last thing about production ready code... In production you will always have to let a client to have a timeout value for waiting. Inconceivable to have a client blocked forever without being able to bail-out.

Regarding whether semaphores are bad, I agree that for monitor-like workflows, a monitor-based API is better. However, there are use-cases where semaphores are the natural choice. For example, if you want to limit the number of in-flight requests in your web server, then a semaphore is the obvious choice. Another example is avoiding hitting the file descriptor limit. Another example is adding a capacity to an otherwise unbounded message passing channel.

There is a great free ebook The Little Book of Semaphores – Green Tea Press with like 300 pages about semaphores alone, i've kind of been itching to go through it and do all the exercises in rust...

IIRC, there's a long debate about it, somewhat like tabs vs spaces.

1 Like

If this is super critical piece of code, all that unwrap() calls means you are ignoring error conditions, such as charging into the condition variable with a mutex failure. That is very dicey. The function signature should have allowance for error condition to be propagated upward.

The failure of one party could impact the fate of other waiters. In a real production use, obviously the failure scenarios would have to be analyzed as well. I feel monitors are in general more battle tested, with better fault tolerance.

There is one interesting piece of trivia that may be important. The mutex in the Rust standard library supports mutex poisoning, while the parking_lot version of mutex does not support mutex poisoning.