Using crossbeam channels as broadcast channels

Hello, I'm building something that has a feature like Pub/Sub similar to the one that in Redis, where some clients connect and subscribe to some topic/channel and other clients can publish data to that channel.

I thought at first that would be easier by using channels where every client handle a clone of tx and rx of that channel so the client can recv and send data over this channel.

I made a simple program (without any connection to the outworld) to test this idea.

//# crossbeam = "0.7.2"

use crossbeam::channel;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx) = channel::bounded(2);
    let mut threads = Vec::new();
    for i in 1..6 {
        let rx = rx.clone();
        let handle = thread::spawn(move || {
            println!("thread #{} started !", i);
            for item in rx.recv() {
                println!("thread #{} got item: {}", i, item);
            }
        });
        threads.push(handle);
    }
    thread::sleep(Duration::from_millis(500));
    // send simple data
    tx.send(true).unwrap();
    for thread in threads {
        let _ = thread.join();
    }
}

and I started to run this program multiple times

$ cargo play bus.rs

Output #1:

thread #2 started !
thread #1 started !
thread #3 started !
thread #5 started !
thread #4 started !
thread #1 got item: true

Output #2:

thread #1 started !
thread #2 started !
thread #3 started !
thread #5 started !
thread #4 started !
thread #2 got item: true

Output #3:

thread #3 started !
thread #4 started !
thread #2 started !
thread #5 started !
thread #1 started !
thread #3 got item: true

so what I concluded from that simple test is that crossbeam channels will send the data at most once to one subscriber randomly! ( I'm not sure of that ).

so is there any way to send the item once from the tx and it would send as many clones of it to all the rx s?

Small Notes:

  1. I'm using cargo play a small and nice project for running your code easily without setting up a cargo project (think a local version of play.rust-lang.org)
  2. I'm aware of the bus crate which doses just like that, but I'm looking for a using crossbeam.

oh, I forgot to mention @stjepang here :sweat_smile:

Unfortunately, Crossbeam's channels cannot broadcast messages, so you would have to create one channel per subscriber. To publish a message, the publisher would then do something like:

for s in subscribers {
    s.send(msg.clone()).unwrap();
}
1 Like

hmm :thinking:, so there is no workaround this? is there another way to implement this feature?
another small question: what is the performance impact of creating a detected channel for every subscriber and keeping only the first created reference of the tx and rx so it would be spsc instead of mpmc? is that a good usage of the crossbeam channels? instead of using say the futures one?

I'm not comparing here the two, I just curious to know if the crossbeam channel best suited for such a task!

It's probably fine to use a loop for broadcasts, as long as you're not dynamically adding and removing subscribers, since then you have to clean the list up. You should use the future channel if you're sending to a future, otherwise use crossbeam.

1 Like

As you've discovered, crossbeam channels don't implement the 'fan-out' and copying or reference-counting that would be necessary for a broadcast pattern on a single channel. You'll need to add an abstraction to model the subscription topology, as you're thinking about here.

When you do, there's no reason it has to be spsc (though of course it can be if that's all you happen to need). There's also no reason that it has to involve an explosion of channels with the potential efficiency concerns.

I suggest you model the problem as:

  • a topic, which creates a channel and clones the tx for every publisher
  • a subscriber, which creates a channel, and gives a clone of the tx to each topic it subscribes to
  • a message, which wraps the actual content and a topic identifier (so the subscriber knows where it came from on its single rx) in an Arc<>

Then the topic keeps track of its subscribers, and sends a clone of the message to each one.

This has some nice properties:

  • you only have n_topics + n_subs channels, not some multiplicative (or worse) overhead
  • subscriptions can be dropped with a simple error handler in the topic if the subscriber goes away
  • you have mpmc pub/sub, but you're only using channels in an mpsc pattern, which gives you a wider choice of possible implementations, including the futures ones if that's what you end up needing.
2 Likes

There is the excellent bus crate for that very use csde.

https://crates.io/crates/bus

2 Likes