Multiple receiver mpsc channel using Arc<Mutex<Receiver>>

Hello I'm trying to get multiple reciever using Arc<Mutex<Receiver>>. But one thread blocks everything. I don't know why. Can you explain and solve the problem?

use std::sync::mpsc::{self, Sender, Receiver};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {

    let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel();

    let rx = Arc::new(Mutex::new(rx));

    let mut join_handles = vec![];

    for id in 0..2 {

        let rx = rx.clone();

        let join_handle = thread::spawn(move || loop {

            match rx.lock().unwrap().recv() {

                Ok(msg) => {
                    println!("Thread {} got a msg: {}", id, msg);
                    thread::sleep(Duration::from_millis(500));
                }

                Err(e) => {
                    println!("Thread {} got a error: {}", id, e);
                    break;
                }
            }
        });

        join_handles.push(join_handle);
    }

    for i in 0..5 {
        tx.send(format!("msg#{}", i)).unwrap();
    }

    drop(tx);

    for join_handle in join_handles {
        join_handle.join().unwrap();
    }

}

The best solution is to avoid std::sync::mpsc in favor of crossbeam::channel. (crossbeam is a crate that would be in std, if Rust had a kitchen sink std)

1 Like

I would use that in real life situation. But I'm doing this to learn.

Your thread is holding the lock for the entire sleep duration. And when it wakes up, it's the only thread awake to acquire the lock. crossbeam will have the same behavior, FWIW.

You can workaround this in a few ways. For example, assign the MutexGuard to a variable, and drop it just before the thread is put to sleep: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=a9e909f77ced3ee0c2f3cd5741c6d4bc

Another approach is to use block scopes instead of temporary variable bindings and explicit drop: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=30dcca327ea9e83c1aa55fbcdb2f22a2

1 Like

With crossbeam the Arc<Mutex<_>> is unnecessary as you can directly clone the Reciever and send it across threads.

impl<T> Sync for Receiver<T>
where
    T: Send,

Nice! Thanks for the heads up on that.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.