Syncing two threads with condvar (alt. to posix semaphores)


#1

Hello, I’m trying to figure how to use “Condvar” for syncing two threads like “posix” semaphores(sem_wait, sem_post)
In this program I want to sync two threads, to write out:

First
Second
First
Second

use std::sync::{Arc, Condvar, Mutex};
use std::thread;

const NO_TH: usize = 2;

fn main() {
    let mut handles  = Vec::with_capacity(NO_TH);
    let guard_first  = Arc::new((Mutex::new(false), Condvar::new()));
    let guard_second = Arc::new((Mutex::new(false), Condvar::new()));
    {
        let guard_first  = guard_first.clone();
        let guard_second = guard_second.clone();
        handles.push(thread::spawn(|| {
            for _ in 0..3 {
                // TODO: "sem_wait" guard_first
                println!("First thread");
                // TODO: "sem_post" guard_second
            }
        }));
    }
    {
        let guard_first  = guard_first.clone();
        let guard_second = guard_second.clone();
        handles.push(thread::spawn(|| {
            for _ in 0..3 {
                // TODO: "sem_wait" guard_second
                println!("Second thread");
                // TODO: "sem_post" guard_first
            }
        }));
    }
    for handle in handles {
        handle.join().unwrap();
    }
}

#2

Not sure if this is exactly what you want, but you can make your own semaphore struct out of a Condvar and Mutex and use that instead of trying to work with them directly. I found a simple C version on this site and translated it into Rust like this:

use std::sync::{Arc, Condvar, Mutex};
use std::thread;

const NO_TH: usize = 2;

pub struct Semaphore {
    mutex: Mutex<i32>,
    cvar: Condvar,
}

impl Semaphore {
    pub fn new(count: i32) -> Self {
        Semaphore {
            mutex: Mutex::new(count),
            cvar: Condvar::new(),
        }
    }

    pub fn dec(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock -= 1;
        if *lock < 0 {
            let _ = self.cvar.wait(lock).unwrap();
        }
    }

    pub fn inc(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
        if *lock <= 0 {
            self.cvar.notify_one();
        }
    }
}

unsafe impl Send for Semaphore {}
unsafe impl Sync for Semaphore {}

fn main() {
    let mut handles = Vec::with_capacity(NO_TH);
    let sem1 = Arc::new(Semaphore::new(0));
    let sem2 = Arc::clone(&sem1);

    handles.push(thread::spawn(move || {
        for _ in 0..3 {
            sem1.inc();
            println!("First thread");
            sem1.dec();
        }
    }));

    handles.push(thread::spawn(move || {
        for _ in 0..3 {
            sem2.dec();
            println!("Second thread");
            sem2.inc();
        }
    }));

    for handle in handles {
        handle.join().unwrap();
    }
}

It’s not the most robust thing in the world, (it unwraps in a few places and doesn’t guard against counter overflow. I also think it’s not signal-safe if I’m reading the source page correctly), but hopefully it serves as a proof of concept.


#3

Porting the linked example to Rust was great (although Rust Condvar documentation preconises checking the condition within a while loop, so I’ve changed that too), but your main does not correspond to what was asked in the OP (alternated messages printed): good semaphore implementatiom, but wrong usage.

I’ve thus fixed the main with one that does what was asked, and have also got ridden of unnecessary Arcs by using scoped threads:

use std::sync::{Condvar, Mutex};

pub
struct Semaphore {
    mutex: Mutex<i32>,
    cvar: Condvar,
}

impl Semaphore {
    pub
    fn new (count: u16) -> Self
    {
        Semaphore {
            mutex: Mutex::new(count as i32),
            cvar: Condvar::new(),
        }
    }

    pub
    fn dec (&self)
    {
        let mut lock = self.mutex.lock().unwrap();
        *lock -= 1;
        while *lock < 0 {
            lock = self.cvar.wait(lock).unwrap();
        }
    }

    pub
    fn inc (&self)
    {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
        if *lock <= 0 {
            self.cvar.notify_one();
        }
    }
}

fn main ()
{
    const N: u32 = 10;
    let sem1 = Semaphore::new(0);
    let sem2 = Semaphore::new(1); // thread 2 said go
    ::crossbeam::scope(|scope| {
        // thread 1
        scope.spawn(|_| (0 .. N).for_each(|_| {
            sem2.dec(); // wait for thd 2 to say go
            println!("First");
            sem1.inc(); // say go
        }));

        // thread 2
        scope.spawn(|_| (0 .. N).for_each(|_| {
            sem1.dec(); // wait for thd 1 to say go
            println!("Second");
            sem2.inc(); // say go
        }));
    }).expect("Some thread panicked");
}

EDIT: a version with N threads


#4

Thank you for effort, nice Semaphore implementation.


#5

Thank you, find also Semaphore version using Crossbeam’s MsQueue here


#6

Nice Crossbeam version for N threads.


#7

Ah, those changes make sense. When I tested my version it actually did interleave the print messages as asked, but after testing it some more it only seems to do so sometimes. Possibly because I didn’t account for spurious wakeups but also because my main function wasn’t imposing a strong enough relationship between the threads. so nice job fixing that!


#8

Oh, the first time I tested your function it misfunctioned right away, hence my surprise :sweat_smile: ; I had been scratching my head to understand how your code managed to uphold the required invariants, but it turns out that since your were only using one semaphore, there was no way for each thread to know who had printed last, thus behaving in practice as just a mutex.