Trying to understand crossbeam mpmc channel in multi consumer scenario

In its default form, this example compiles and the single sender can send messages without errors. Only one receiver is able to receive the message at any given time and it seems to be the one thread that the OS chooses as it sees fit. This is expected according the documentation of crossbeam-channel: Note that cloning only creates a new handle to the same sending or receiving side. It does not create a separate stream of messages in any way:

use std::{thread, time::Duration};
use rand::{rng, seq::IndexedRandom} ;
use crossbeam_channel::{bounded, Receiver};

trait Kid {
    fn start(&self, name: String, receiver: Receiver<String>) {
        thread::spawn(move || {
            loop {
                println!("{name} is doing daily chores and collects spores.");
                if let Ok(message) = receiver.recv() {
                    println!("{name} received: {message}");
                } else {
                    println!("{name} never received but deceived.");
                    break;
                }
                thread::sleep(Duration::from_millis(750));
            }
        });
    }
}

struct NephewAndrew {}
impl Kid for NephewAndrew {}
struct NephewMatthew {}
impl Kid for NephewMatthew {}
struct NieceAnise {}
impl Kid for NieceAnise {}


struct UncleMonocle {
    i_am: String,
}

impl UncleMonocle {
    pub fn start(&self) -> thread::JoinHandle<()> {
        let (sender, receiver) = bounded(1);
        let andrew = NephewAndrew {};
        andrew.start("Nephew Andrew".to_owned(), receiver.clone());
        let matthew = NephewMatthew {};
        matthew.start("Nephew Matthew".to_owned(), receiver.clone());
        let anise = NieceAnise {};
        anise.start("Niece Anise".to_owned(), receiver.clone());

        let uncle_monocle_says = [
            "Comment Component",
            "Trust Rust",
            "Knightly Night",
            "Election Selection",
            "Scrap Crap",
            "Spray Pray",
            "Tribe Scribe",
            "Message Passage",
            "Power Powder",
            "Science Conscience"
        ];

        let name = self.i_am.clone();
        thread::spawn(move || {
            let mut rng = rng();
            let mut count = 0;
            while count < 10 {
                println!("-------------------------");
                println!("{name} does daily errands and spends ten grands.");
                let i_say = uncle_monocle_says.choose(&mut rng).unwrap();

                println!("{name} says: {i_say}");
                    sender.send(i_say.to_string()).expect("{name}: Message passage failed and flailed while sending was pending.");

                thread::sleep(Duration::from_secs(1));
                count += 1;
            }
        })
    }
}

fn main() {
    let um = UncleMonocle {
        i_am: "Uncle Monocle".to_owned(),
    };

    let join_handle = um.start();

    join_handle.join().unwrap();
}

(Playground)

Output:

Nephew Andrew is doing daily chores and collects spores.
Nephew Matthew is doing daily chores and collects spores.
Niece Anise is doing daily chores and collects spores.
-------------------------
Uncle Monocle does daily errands and spends ten grands.
Uncle Monocle says: Tribe Scribe
Niece Anise received: Tribe Scribe
Niece Anise is doing daily chores and collects spores.
-------------------------
Uncle Monocle does daily errands and spends ten grands.
Uncle Monocle says: Science Conscience
Nephew Matthew received: Science Conscience
Nephew Matthew is doing daily chores and collects spores.
-------------------------
Uncle Monocle does daily errands and spends ten grands.
Uncle Monocle says: Message Passage
Nephew Andrew received: Message Passage
Nephew Andrew is doing daily chores and collects spores.
-------------------------
Uncle Monocle does daily errands and spends ten grands.
Uncle Monocle says: Trust Rust
Niece Anise received: Trust Rust
Niece Anise is doing daily chores and collects spores.
-------------------------
Uncle Monocle does daily errands and spends ten grands.
Uncle Monocle says: Scrap Crap
Nephew Matthew received: Scrap Crap
Nephew Matthew is doing daily chores and collects spores.
-------------------------
Uncle Monocle does daily errands and spends ten grands.
Uncle Monocle says: Trust Rust
Nephew Andrew received: Trust Rust
Nephew Andrew is doing daily chores and collects spores.
-------------------------
Uncle Monocle does daily errands and spends ten grands.
Uncle Monocle says: Science Conscience
Niece Anise received: Science Conscience
Niece Anise is doing daily chores and collects spores.
-------------------------
Uncle Monocle does daily errands and spends ten grands.
Uncle Monocle says: Tribe Scribe
Nephew Matthew received: Tribe Scribe
Nephew Matthew is doing daily chores and collects spores.
-------------------------
Uncle Monocle does daily errands and spends ten grands.
Uncle Monocle says: Scrap Crap
Nephew Andrew received: Scrap Crap
Nephew Andrew is doing daily chores and collects spores.
-------------------------
Uncle Monocle does daily errands and spends ten grands.
Uncle Monocle says: Scrap Crap
Niece Anise received: Scrap Crap
Niece Anise is doing daily chores and collects spores.
Nephew Matthew never received but deceived.
Nephew Andrew never received but deceived.
Niece Anise never received but deceived.

Errors:

   Compiling playground v0.0.1 (/playground)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.94s
     Running `target/debug/playground`

When I change the sending code to use a 3 times loop and send the message 3 times, then every receiver receives the message. This seems to be the fix:

My questions are:

  1. Why is the crossbeam designed this way? Why isn't there an internal mechanism that keeps track of receiver numbers and just sends the message in a loop itself (instead of cloning the message contents in client code n times).
  2. If the answer to the previous question is default performance and simplicity, then why isn't there an extra API that implements this broadcast functionality optionally? We only pay for what we use right?
  3. I can understand the message not being received by all receivers, because there wasn't 3 messages in the pipeline to begin with, there was only one. But can't understand why receivers don't proceed to do their daily chores if there's no message for them? In the output of first (broken) version I see the only thread that does other work (chores), is the thread that successfully receives the message.
  4. What does bounded and unbounded mean in real terms? What's the difference between bounded(1), bounded(5) and unbounded in the context of this example?

Thanks

Because it is designed to split work across consmers. What you're trying to do is a different usecase.

Simply because nobody implemented it. Also, my guess is that the additional complications to solve (e.g. yielding references to consumers or cloning, if the latter then how do design the API, how to deal with lagging consumers) are enough that it's not just a matter of blindly adapting the existing code. AFAIK an implementation of broadcast channels is available in tokio, but that will also require you to pull in all the async-related code (note that the broadcast channel can still be used from sync code though).

Receiver::recv is blocking, i.e. before returning it waits until a message is received (or the channel is closed). If there's no message it thus waits for one, and the code doesn't reach the chore part. You have to use try_recv if you want to check if there are messages in that exact moment in time, and if not continue immediately with the rest of your code.

2 Likes

I added another question I forgot to ask:

What does bounded and unbounded mean in real terms? What's the difference between bounded(1), bounded(5) and unbounded in the context of this example?

It's about the buffer where messages are stored until a consumer receives them. A bounded buffer has a fixed size and will block senders when they they try to send a message while the buffer is full. An unbounded buffer has no such behaviour and instead allows an arbitrary amount of pending messages (this can sometimes be undesiderable though!)

1 Like

I think I understand now. Since default use-case of multi-threading is thought to be splitting expensive computational loads to multiple execution units, multi-receiver scenario is also thought to be like:

Send the task to any available receiver to compute. If the receiver is busy then try the other one which is idle at that moment in time. This maximizes efficiency when handling arbitrary number and weight of computations.

One undefault use-case of multi-threading is something like I do in the example. Thinking of multiple threads as semi-independent control flows for distributed simulation-like settings.

Thanks a lot for the explanation. So related to my example, since there's only one producer of messages and since sending operations are synchronous (one at a time) then in my case bounded(1) is the way to go. If at any given time n numbers of producers send to the channel simultaneously, then we need to increase the buffer size, because some of the senders might get blocked if they send the message while a message is already in transit.

Correct me if I'm wrong in any of this. Thanks.

Technically speaking, tasks/messages are not sent directly to receivers, nor do senders try to send to a specific receiver and retrying with the next one if it's busy. Instead you can think of channels as a kind of queue, with senders pushing into this queue, and receivers popping from this queue. Then on top of this there's a kind of notification system to allow senders to "wake up" a receiver if there's one currently waiting for a message.

Which of the two examples (i.e. the initial one or the one you send 3 messages together)? In any case, for a robust system I would try to give slightly more capacity than what's theoretically needed, otherwise you risk that the correctness of your system depends on its timing, which is hard both to test and to debug.

1 Like