I'm trying to make a multi-threaded app, but refrain from using async.
I want to make an app with the following config:
producer thread, that generates data items
input queue (where producer writes to)
worker threads
output queue, where workers write to (may be MPSC)
The main thread will be reading from the output queue.
Question is: which package to use for these queues? MPSC won't work as input queue, because it has only one consumer. Crossbeam ArrayQueue seems to be what I need, but in the examples I see if the queue is full, the pushing thread receives Err (I thought it should just wait.), and I guess it needs some code to make it wait.
I found another project, deadqueue, but the sample code from README deadlocks and does nothing.
crossbeam-channel is the best. A bounded channel is equivalent to a queue with a backpressure. It doesn't come with a threadpool, you'll have to hook one up yourself, but because it's mpmc channel, it's as easy as spawning N threads and receiving on them.
use std::thread;
use std::time::Duration;
use crossbeam_channel::bounded;
fn main() {
let (s, r) = bounded(10);
let mut threads:Vec<thread::JoinHandle<_>> = vec![];
for i in 0..10 {
let r2 = r.clone();
threads.push(thread::spawn(move || {
loop {
let x = r2.recv().unwrap();
println!("thread {} receieved {:?}", i, x);
thread::sleep(Duration::from_secs(2));
if x == -1 {
break
}
}
println!("thread {:?} ending", i);
}));
}
for i in 0..30 {
println!("sending {:?}", i);
s.send(i).unwrap();
}
for _ in 0..10 { s.send(-1).unwrap(); }
for th in threads { th.join().unwrap(); }
}
FYI, the normal way to shut down the thread is to simply drop the sender.
for i in 0..30 {
println!("sending {:?}", i);
s.send(i).unwrap();
}
drop(s);
At that point, the recv call on all receivers will return None. With your unwrap that would cause a panic, but you can do the following to simply exit the loop when it happens:
while let Some(x) = r2.recv() {
println!("thread {} receieved {:?}", i, x);
thread::sleep(Duration::from_secs(2));
}