I would like to use an mpsc channel using multiple threads writing to the channel and the main thread reading from the channel in a classic producer-reducer pattern. All examples I've found, however, use unscoped threads, and I need scoped threads.
The problem is that as soon as you have scoped threads the code reading from the channel is executed after all threads are finished, so if you have a lot of sends you either get a deadlock from a full channel or you use a lot of memory.
I have tried a lot of different configurations, but since the receiver can't be sent across threads I cannot receive inside the thread_pool.scope closure.
Has anybody a suggestion on how to do this? As an example, you can consider any example in the documentation of std::sync::mpsc::channel.
Note that the documentation contains an example using scoped threads, but it reads from the channels after the scope, and once again, in this case all messages are enqueued and they are dequeued and processed only after all threads have finished.
Some code example would help make your question a lot more clear.
You write “scoped threads” and “thread_pool.scope closure” is this referring to std::thread::scope or something else?
Assuming std::thread::scope, you write “the code reading from the channel is executed after all threads are finished”, but with std::thread::scope that’s not a given; only if you put that code after the end of the scope, which is easy to avoid by … well … not putting it there, but inside of the scope.
You write “the receiver can't be sent across threads” but that explanation doesn’t seem to make sense to me because on one hand std::sync::mpsc::Receiver<T>does implement Send[1], and on the other hand (still, assuming std::thread::scope is used) I don’t see how a Send requirement would come about from “receiving [from the channel] inside of the scope closure” in the first place. Also perhaps that addresses the “putting the code reading from the channel inside of the scope” suggestion I’ve made above. Either you’re misdiagnosing the problem you actually ran into, or you’re not giving enough context.
Er... you're right. Here's the playground. My problem is that this works with std:: thread::scope, but it does not work with a ThreadPool::scope() in rayon, God knows why, in the sense that if I try the same code with rayon and a ThreadPool the compiler complains that rx is not Send + Sync.
Still, I would like to obtain an effect like that of this playgroud, that is, simply iterating on the receiver, but reading from the receiver in parallel with the spawned threads writing to the transmitter. In this example all threads must be finished before the channel is read. (The ´drop(tx)` is necessary to make the iteration stop.)
Is theree any built-in method to do this? I can do something standard like, say, have each spawned thread send a final poison pill and count them while iterating, but I was wondering if there's some ready-made way.
mpsc is "multi-producer", which means you can make copies of the sender. All you need to do is clone it for each thread, then you can drop your first copy once all threads are spawned.
fn main() {
let (tx, rx) = std::sync::mpsc::channel();
std::thread::scope(|scope| {
for _ in 0..10 {
let tx_clone = tx.clone();
scope.spawn(move || {
tx_clone.send(0).unwrap();
});
}
drop(tx);
for _ in 0..10 {
eprintln!("{}", rx.recv().unwrap());
}
});
}