Hi,
I am using crossbeam-channel
as a multi-producer-multi-consumer-channel. I have a problem similar to this one:
extern crate crossbeam_channel;
extern crate threadpool;
use crossbeam_channel as channel;
use threadpool::ThreadPool;
const WORKER: usize = 4;
fn main() {
let (s, r) = channel::unbounded();
s.send(3);
let pool = ThreadPool::new(WORKER);
for _ in 0..WORKER {
let (s, r) = (s.clone(), r.clone());
pool.execute(move || loop {
match r.recv() {
Some(val) => {
println!("{}", val);
if val > 1 {
for _ in 0..val {
s.send(val - 1);
}
}
}
None => break,
}
});
}
pool.join();
}
This never finishes since all worker-threads block in r.recv
. Basically I don't have a way to predict when all workers stop producing new values, only that this eventually finishes. I guess I want my workers to terminate when all workers are blocked in recv
. To solve this I am using a timeout and loop until the queue is empty:
#[macro_use]
extern crate crossbeam_channel;
extern crate threadpool;
use std::time::Duration;
use crossbeam_channel as channel;
use threadpool::ThreadPool;
const WORKER: usize = 4;
fn main() {
let (s, r) = channel::unbounded();
s.send(3);
let pool = ThreadPool::new(WORKER);
let timeout = Duration::from_millis(10);
loop {
for _ in 0..WORKER {
let (s, r) = (s.clone(), r.clone());
pool.execute(move || loop {
select! {
recv(r, msg) => {
match msg {
Some(val) => {
println!("{}", val);
if val > 1 {
for _ in 0..val {
s.send(val - 1);
}
}
}
None => break,
}
}
recv(channel::after(timeout)) => break,
}
});
}
pool.join();
match r.try_recv() {
Some(val) => {
s.send(val);
}
None => break,
}
}
}
I am pretty sure that there is a better and more idiomatic way to solve this. How would you solve this?