How to use a vec of channels?

Hello. I'm trying spawn multiple threads which each ask the main thread for a job to work on. Then the main thread will respond to each thread with a job.

I can't quite get it to work. Here's my code. I can't figure out how to use the channel that stored in the vector.

use std::sync::{Arc, mpsc};
use std::thread;

fn main() {
    let (ready_tx, ready_rx) = mpsc::channel();
    let mut work_channels = Vec::new();

    for _ in 0..10 {
        let channel = mpsc::channel::<String>();
        work_channels.push(channel);
    }

    let mut threads = Vec::new();

    for i in 0..10 {
        let ready_tx = ready_tx.clone();
        let chan = Arc::new(work_channels[i]);
        let thread = thread::spawn(move || {
            let (_, work_rx) = chan.deref(); // Can't figure this part out.
            loop {
                ready_tx.send(i).unwrap();
                let job = work_rx.recv().unwrap();
            }
        });
        threads.push(thread);
    }
    
    let i = ready_rx.recv().unwrap();
    let (work_tx, _) = work_channels[i];
    work_tx.send(String::from("Here's a job")).unwrap();
}

Thanks for the help!

First of all, don't use std::mpsc. It's much slower and less flexible than crossbeam-channel.

Crossbeam channels are cloneable, so you can make one channel pair, clone it to all threads, and then collect it all on a single receiver.

std::mpsc doesn't support "select", so you can't wait for any message from any channel. At best you can send one message to each channel, and then wait for them in order for ch in work_channels { ch.recv() }.

Remember that this is all blocking, so you must send before you receive.

And if you only want to distribute work across threads, then use rayon.

Oh cool, I'll definitely check those out!

Any tips on why my code doesn't work though? I feel like I'm misunderstanding something fundamental about ownership, borrowing, etc.

The code you posted gives this error:

error[E0599]: no method named `deref` found for struct `Arc<(Sender<String>, std::sync::mpsc::Receiver<String>)>` in the current scope
  --> src/main.rs:19:37
   |
19 |             let (_, work_rx) = chan.deref(); // Can't figure this part out.
   |                                     ^^^^^ method not found in `Arc<(Sender<String>, std::sync::mpsc::Receiver<String>)>`
   |
   = help: items from traits can only be used if the trait is in scope
help: the following trait is implemented but not in scope; perhaps add a `use` for it:
   |
1  | use std::ops::Deref;
   |

If you follow the help text and import the Deref trait, then you get this error:

error[E0277]: `Sender<String>` cannot be shared between threads safely
   --> src/main.rs:19:22
    |
19  |         let thread = thread::spawn(move || {
    |                      ^^^^^^^^^^^^^ `Sender<String>` cannot be shared between threads safely
    |
    = help: within `(Sender<String>, std::sync::mpsc::Receiver<String>)`, the trait `Sync` is not implemented for `Sender<String>`
    = note: required because it appears within the type `(Sender<String>, std::sync::mpsc::Receiver<String>)`
    = note: required because of the requirements on the impl of `Send` for `Arc<(Sender<String>, std::sync::mpsc::Receiver<String>)>`
    = note: required because it appears within the type `[closure@src/main.rs:19:36: 25:10]`

error[E0277]: `std::sync::mpsc::Receiver<String>` cannot be shared between threads safely
   --> src/main.rs:19:22
    |
19  |         let thread = thread::spawn(move || {
    |                      ^^^^^^^^^^^^^ `std::sync::mpsc::Receiver<String>` cannot be shared between threads safely
    |
    = help: within `(Sender<String>, std::sync::mpsc::Receiver<String>)`, the trait `Sync` is not implemented for `std::sync::mpsc::Receiver<String>`
    = note: required because it appears within the type `(Sender<String>, std::sync::mpsc::Receiver<String>)`
    = note: required because of the requirements on the impl of `Send` for `Arc<(Sender<String>, std::sync::mpsc::Receiver<String>)>`
    = note: required because it appears within the type `[closure@src/main.rs:19:36: 25:10]`

This is telling you that you can't have multiple threads with access to the same Sender or Receiver. Instead, the Senders must stay on the main thread, and each Receiver needs to be moved into a worker thread, like this: Rust Playground

1 Like

Ok, I think I see. I was trying to move tuples of (Sender, Receiver) onto the thread, but only Receivers can be moved.

Follow up question... these Senders and Receivers are user defined types? How does the compiler know that one can be moved to a thread and the other can't? It says "The trait Sync is not implemented for Sender", so then how does a thread know that it can't use vars that don't implement that trait?

In the implementation of thread::spawn is there some annotation that says "don't allow moving of vars that don't implement Sync trait?"

Thanks again!

Minor correction: Both Sender and Receiver can be moved to another thread (they implement the Send trait), but neither can be shared across multiple threads (they don't implement the Sync trait).

The std::thread::spawn function requires its arguments to be Send, which means it will reject any type that can't be moved to another thread.

If you send an Arc<T> to another thread, that could result in multiple threads having access to the T that it points to (because there can be multiple Arc<T> pointing to the same T). Therefore, Arc<T> implements Send only if T implements Sync. This is why you can't send an Arc<(Sender, Receiver)> to another thread.

I wrote a lot more about Send and Sync in: Rust: A unique perspective

3 Likes

I should also note that the standard library uses the unstable #[rustc_on_unimplemented] attribute to provide custom compiler errors when a Send or Sync bound is unsatisfied: marker.rs - source

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.