Having multiple receivers listening to the same sender in Rust?

I am trying to have multiple receivers listening to one channel in Rust. The receivers are in parallel threads, so it does not matter if the message get randomly assigned to any listening receiver.

Basically, this is my code:

use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError};
use std::thread::{spawn, sleep};
use std::time::Duration;
use rand::{thread_rng, Rng};

fn main() {
    let (tx1, rx1) = channel();
    let (tx2, rx2) = channel();
    for i in 0..3 {
        let tx1 = tx1.clone();
        let rx2 = rx2.clone();
        spawn(move || consumer(i, tx1,rx2));
    }
    let vec_int:Vec<u64>=vec![3,1,2,2,1,3,4,4,2];
    spawn(move || producer(vec_int,rx1,tx2));
    loop {}
}

fn consumer(thread: i32, request: Sender<bool>, response: Receiver<u64>) {
    let mut receive_counter=3;
    loop {
        request.send(true).unwrap();
        let r =response.recv().unwrap();
        println!("Thread {} received {}",thread,r);
        receive_counter-=1;
        if receive_counter==0 {
            println!("Thread {} is done!", thread);
            break;
        }else {
            sleep(Duration::from_secs(r))
        }
    }
}

fn producer(mut vec_u64: Vec<u64>, request: Receiver<bool>, response: Sender<u64>) {
    loop{
        match request.try_recv(){
            Ok(_) => {
                let send_val= vec_u64.swap_remove(0);
                response.send(send_val);
                if vec_u64.len()==0{
                    println!("Finishing producing");
                    break;
                }
            }
            _ =>{ }
        }
    }
}

But a big no..no with this is that you can clone a sender, but cannot clone a receiver. So, is there any good idea to handle this problem?
Thank you.

The channels in std::sync::mpsc are multiple-producer, single-consumer channels, which means they only support one receiver per channel. The crossbeam_channel crate provides multiple-consumer channels with a similar interface.

1 Like

For anyone later looking how to do this with crossbeam_channel :

use std::thread::{spawn, sleep};
use std::time::Duration;
use crossbeam_channel::{unbounded, Sender, Receiver};

fn main() {
    let (tx1, rx1) = unbounded();
    let (tx2, rx2) = unbounded();
    for i in 0..3 {
        let tx1 = tx1.clone();
        let rx2 = rx2.clone();
        spawn(move || consumer(i, tx1,rx2));
    }
    let vec_int:Vec<u64>=vec![3,1,2,2,1,3,4,4,2];
    spawn(move || producer(vec_int,rx1,tx2));
    loop {}
}

fn consumer(thread: i32, request: Sender<bool>, response: Receiver<u64>) {
    let mut receive_counter=3;
    loop {
        request.send(true).unwrap();
        let r =response.recv().unwrap();
        println!("Thread {} received {}",thread,r);
        receive_counter-=1;
        if receive_counter==0 {
            println!("Thread {} is done!", thread);
            break;
        }else {
            sleep(Duration::from_secs(r))
        }
    }
}

fn producer(mut vec_u64: Vec<u64>, request: Receiver<bool>, response: Sender<u64>) {
    loop{
        match request.try_recv(){
            Ok(_) => {
                let send_val= vec_u64.swap_remove(0);
                response.send(send_val).unwrap();
                if vec_u64.len()==0{
                    println!("Finishing producing");
                    break;
                }
            }
            _ => {

            }
        }
    }
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.