I am trying to implement a semaphore for the first time, started with a basic version, encountered an error, that I am struggling to understand.
const NUMBER_OF_RESOURCES: u32 = 1;
pub struct Semaphore {
counter: AtomicU32,
}
impl Semaphore {
pub fn new() -> Self {
Self {
counter: AtomicU32::new(NUMBER_OF_RESOURCES),
}
}
pub fn signal(&self) {
assert!(self.counter.load(Relaxed) < NUMBER_OF_RESOURCES);
if self.counter.fetch_add(1, Release) == 0 {
wake_one(&self.counter);
}
}
pub fn wait(&self) {
while self.counter.load(Relaxed) == 0 {
wait(&self.counter, 0);
}
self.counter.fetch_sub(1, Acquire);
}
}
I use atomic-wait crate.
The error: threads start to decrement and increment semaphore normally, then one of the threads encounter u32::MAX counter instead of 1, when calls wait() method. Then, obviously, several threads are able to access the semaphore simultaneously. I am trying to understand what is happenning for educational purposes. Would be very grateful for any suggestion.
Threads can be spuriously woken. (Unless wait somehow guarantees that it has no spurious wakeups?) If such a wake-up occurs around the same time as wake_one, then two threads could see counter.load(Relaxed) be zero, allowing both to decrement the counter and wrapping around to u32::MAX.
That’s my guess, anyway. Edit: ah, I just realized, it still fails even if wait(&self.counter, _) has no spurious wakeups. Calling Semaphore::wait concurrently could allow multiple active threads to see that the counter is 0 and bypass the loop.
I’d recommend using compare_exchange_weak in a loop instead.
The assert! in Semaphore::signal is similarly futile against concurrent calls to signal.
Thank you for the suggestions! I will think it over again.
Hi, could you, please, have a look at a version of a binary semaphore. I am trying to learn the basics. It seems to work.
pub struct Semaphore {
counter: AtomicU32,
}
impl Semaphore {
pub fn new() -> Self {
Self {
counter: AtomicU32::new(1),
}
}
pub fn signal(&self) {
self.counter.fetch_add(1, Release);
wake_one(&self.counter);
}
pub fn wait(&self) {
while self.counter.compare_exchange_weak(1, 0, Acquire, Relaxed).is_err() {
wait(&self.counter, 0);
}
}
}
Modification for a semaphore with counter > 1:
pub fn wait(&self) {
while self.counter.fetch_update(Relaxed, Acquire, |x| {
if x > 0 { Some(x - 1)
} else { None }
}).is_err() {
wait(&self.counter, 0);
}
}