How is the `mpsc::channel` guaranteed awaken in this situation?

This is snippet of the core implementation of mpsc::channel in Rust

impl<T> Queue<T> {
    /// Creates a new queue that is safe to share among multiple producers and
    /// one consumer.
    pub fn new() -> Queue<T> {
        let stub = unsafe { Node::new(None) };
        Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
    }

    /// Pushes a new value onto this queue.
    pub fn push(&self, t: T) {
        unsafe {
            let n = Node::new(Some(t));
            let prev = self.head.swap(n, Ordering::AcqRel);
            (*prev).next.store(n, Ordering::Release);
        }
    }

    pub fn pop(&self) -> PopResult<T> {
        unsafe {
            let tail = *self.tail.get();
            let next = (*tail).next.load(Ordering::Acquire);

            if !next.is_null() {
                *self.tail.get() = next;
                assert!((*tail).value.is_none());
                assert!((*next).value.is_some());
                let ret = (*next).value.take().unwrap();
                let _: Box<Node<T>> = Box::from_raw(tail);
                return Data(ret);
            }

            if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
        }
    }
}

Assuming tx.send(val) in one thread happens first in the timeline, and in another thread, the calling to rx.recv() will call Queue::pop, Since no "happen-before" relationship between push and pop, there exists a possible case that next is null and self.head.load(Ordering::Acquire) == tail is true as if the calling to push has not yet happened, the returned Empty will make the thread in which pop is called block to wait. In this situation, if there is no subsequent calling to push to add data, how is the blocked thread awaken to read the value we have pushed?

Why do you say there's no "happens-before"? push() stores to next with Release as it's last op, and pop() reads from next with Acquire as it's first, and similarly for head as the first and last ops respectively, this forms a release-acquire ordering that should form a happens-before when they operate on the same queue, if i understand the model correctly.

For an acquire operation to synchronize with a release operation on the same atomic object to form the happens-before, the acquire operation must read the value stored by the release. In C++ abstract machine(since the memory order of Rust is learned from C++), if no other coherence rules imposed, a load operation can read any value whose operation does not happen after the load. Hence, even though a store operation(regardless of what memory order is used) happens first on the timeline, the later happened load operation is not necessary to read that value.

See Can the assertion of the result of the atomic object fail?

Generally, when you go to sleep, you do this:

  1. Register for wakeups.
  2. Try to receive a message. (calling into your pop)
  3. Go to sleep.

And if a thread then sends a message concurrently, then any send that happens after step 1 will wake up the receiver.

Now, since waking up the receiver and registering for wakeups happen on the same atomic, there are two possible cases:

  1. The wakeup happened before registering for wakeups. In this case, a happens-before is established and pop is guranteed to see the message.
  2. Registering for wakeups happens before the sender wakes it up. In this case, pop might not see the message, but if so it will wake up immediately and try to receive again.

So the happens before relationship you need is established by the atomic used to register for wakeups.

2 Likes

Since it is also an atomic object, for the first case, the value stored by wakeup also is not necessary to read by the registering for wakeups, and in this case, they do not form "happens-before". I assume happens before in the quoted sentence as happens before in the timeline.

Okay, let's look at the specific case of mpsc::channel to try to make it clearer.

(FYI this is old code; this version of mpsc::channel was replaced in #93563.)

There is no blocking code in mpsc_queue, all the blocking code is in the shared module which wraps mpsc_queue. So if a call to mpsc_queue::pop were to miss a concurrent call to to mpsc_queue::send, it's the responsibility of shared to detect this situation and retry instead of blocking.

The relevant atomic is the cnt field, which tracks the number of messages in the channel minus the number of receivers waiting - since this is an mpsc channel, there is at most 1 receiver, so a value of -1 indicates a receiver is waiting on an empty channel.

When a concurrent send and recv operation occurs and the recv's intial try_recv's pop misses the value that was pushed, There are two possible orders of the operations on cnt:

  1. First decrement fetch_subs it, then send fetch_adds it - in this case send sees that the old value is -1 and knows there is a receiver waiting, so it wakes it up.
  2. First send fetch_adds it, then decrement fetch_subs it - in this case decrement sees that the old value was > 0, so it knows there is a message available and it doesn't need to wait.

In both cases there is now a happens-before relation between the push and the next attempt recv makes to pop - in case 1 it is made by the wakeup and in case 2 it is made by the sender's increment on cnt synchronizing with the receiver's decrement. This means that the next pop is guaranteed to see the pushed message.

2 Likes

I can't tell if you're agreeing or disagreeing. But on a single atomic location, all writes happen in a total order. This means that either the wakeup or the register comes first in the total order for that atomic location. If the wakeup comes first, then the register reads a value from the wakeup and there's a happens-before. If the register comes first, then the wakeup wakes up the receiver so the receiver will try again after returning Empty.

In the second case, the fetch_sub still can compete with fetch_add such that the result of fetch_sub is 0, but yes, in this case, fetch_add will see -1, which is similar to case 1.

So, the synchronization is guaranteed by cnt and wake as if the implementation was:

send:
push();
if cnt.fetch_add(1) <= -1{  // #1
    wake();
}

recv:
loop{
  let r = cnt.fetch_sub(1);
  let data = pop();
  if r > 0 {  // synchronize with #1
    return data.unwrap();
  }
  match data{
     Has(v)=>return v,
     Empty=>{
      wait();
   }
 }
}

@SNCPlay42 By looking at the source code of Shared, a send followed by twice recv can result in panic?


    pub fn send(&self, t: T) -> Result<(), T> {
        // See Port::drop for what's going on
        if self.port_dropped.load(Ordering::SeqCst) {
            return Err(t);
        }

        if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {
            return Err(t);
        }

        self.queue.push(t);
        match self.cnt.fetch_add(1, Ordering::SeqCst) {
            -1 => {
                self.take_to_wake().signal();
            }

            n if n < DISCONNECTED + FUDGE => {
                // see the comment in 'try' for a shared channel for why this
                // window of "not disconnected" is ok.
                self.cnt.store(DISCONNECTED, Ordering::SeqCst);

                if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 {
                    loop {
                        // drain the queue, for info on the thread yield see the
                        // discussion in try_recv
                        loop {
                            match self.queue.pop() {
                                mpsc::Data(..) => {}
                                mpsc::Empty => break,
                                mpsc::Inconsistent => thread::yield_now(),
                            }
                        }
                        // maybe we're done, if we're not the last ones
                        // here, then we need to go try again.
                        if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 {
                            break;
                        }
                    }

                    // At this point, there may still be data on the queue,
                    // but only if the count hasn't been incremented and
                    // some other sender hasn't finished pushing data just
                    // yet. That sender in question will drain its own data.
                }
            }

            // Can't make any assumptions about this case like in the SPSC case.
            _ => {}
        }

        Ok(())
    }

    pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
        // This code is essentially the exact same as that found in the stream
        // case (see stream.rs)
        match self.try_recv() {
            Err(Empty) => {}
            data => return data,
        }

        let (wait_token, signal_token) = blocking::tokens();
        if self.decrement(signal_token) == Installed {
            if let Some(deadline) = deadline {
                let timed_out = !wait_token.wait_max_until(deadline);
                if timed_out {
                    self.abort_selection(false);
                }
            } else {
                wait_token.wait();
            }
        }

        match self.try_recv() {
            data @ Ok(..) => unsafe {
                *self.steals.get() -= 1;
                data
            },
            data => data,
        }
    }

    // Essentially the exact same thing as the stream decrement function.
    // Returns true if blocking should proceed.
    fn decrement(&self, token: SignalToken) -> StartResult {
        unsafe {
            assert_eq!(
                self.to_wake.load(Ordering::SeqCst),
                EMPTY,
                "This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364"
            );
            let ptr = token.to_raw();
            self.to_wake.store(ptr, Ordering::SeqCst);

            let steals = ptr::replace(self.steals.get(), 0);

            match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
                DISCONNECTED => {
                    self.cnt.store(DISCONNECTED, Ordering::SeqCst);
                }
                // If we factor in our steals and notice that the channel has no
                // data, we successfully sleep
                n => {
                    assert!(n >= 0);
                    if n - steals <= 0 {
                        return Installed;
                    }
                }
            }

            self.to_wake.store(EMPTY, Ordering::SeqCst);
            drop(SignalToken::from_raw(ptr));
            Abort
        }
    }

    pub fn try_recv(&self) -> Result<T, Failure> {
        let ret = match self.queue.pop() {
            mpsc::Data(t) => Some(t),
            mpsc::Empty => None,

            mpsc::Inconsistent => {
                let data;
                loop {
                    thread::yield_now();
                    match self.queue.pop() {
                        mpsc::Data(t) => {
                            data = t;
                            break;
                        }
                        mpsc::Empty => panic!("inconsistent => empty"),
                        mpsc::Inconsistent => {}
                    }
                }
                Some(data)
            }
        };
        match ret {
            // See the discussion in the stream implementation for why we
            // might decrement steals.
            Some(data) => unsafe {
                if *self.steals.get() > MAX_STEALS {
                    match self.cnt.swap(0, Ordering::SeqCst) {
                        DISCONNECTED => {
                            self.cnt.store(DISCONNECTED, Ordering::SeqCst);
                        }
                        n => {
                            let m = cmp::min(n, *self.steals.get());
                            *self.steals.get() -= m;
                            self.bump(n - m);
                        }
                    }
                    assert!(*self.steals.get() >= 0);
                }
                *self.steals.get() += 1;
                Ok(data)
            },

            // See the discussion in the stream implementation for why we try
            // again.
            None => {
                match self.cnt.load(Ordering::SeqCst) {
                    n if n != DISCONNECTED => Err(Empty),
                    _ => {
                        match self.queue.pop() {
                            mpsc::Data(t) => Ok(t),
                            mpsc::Empty => Err(Disconnected),
                            // with no senders, an inconsistency is impossible.
                            mpsc::Inconsistent => unreachable!(),
                        }
                    }
                }
            }
        }
    }

The call to send invoke cnt.fetch_add(1) such that cnt==1, the first recv, assuming invokes self.decrement, the calling to decrement will make:

steals == 0
self.steals == 0
self.cnt == 0 
n = 1
n - steals <=0 is false
return Abort
*self.steals.get() -= 1; // is invoked
return data

the second recv will do something like this:

// self.decrement is invoked
steals == -1
self.steals == 0
self.cnt == 0 - (1+-1) = 0
n = 0
n  - steals == 1 <=0 is false

and this time

        match self.try_recv() {
            data @ Ok(..) => unsafe {
                *self.steals.get() -= 1;
                data
            },
            data => data,
        }

returns Err(Empty), however, rust/library/std/src/sync/mpsc/mod.rs at 34115d040b43d9a0dcc313c6282520a86d1e6f61 · rust-lang/rust · GitHub shows that Err(Empty) will cause panic

    pub fn recv(&self) -> Result<T, RecvError> {
        loop {
            let new_port = match *unsafe { self.inner() } {
                Flavor::Oneshot(ref p) => match p.recv(None) {
                    Ok(t) => return Ok(t),
                    Err(oneshot::Disconnected) => return Err(RecvError),
                    Err(oneshot::Upgraded(rx)) => rx,
                    Err(oneshot::Empty) => unreachable!(),
                },
                Flavor::Stream(ref p) => match p.recv(None) {
                    Ok(t) => return Ok(t),
                    Err(stream::Disconnected) => return Err(RecvError),
                    Err(stream::Upgraded(rx)) => rx,
                    Err(stream::Empty) => unreachable!(),
                },
                Flavor::Shared(ref p) => match p.recv(None) {
                    Ok(t) => return Ok(t),
                    Err(shared::Disconnected) => return Err(RecvError),
                    Err(shared::Empty) => unreachable!(),  //  panic
                },
                Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
            };
            unsafe {
                mem::swap(self.inner_mut(), new_port.inner_mut());
            }
        }
    }

Did I miss something?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.