Please recommend a queue with backpressure, for simple threads (no async yet)

I'm trying to make a multi-threaded app, but refrain from using async.

I want to make an app with the following config:

  • producer thread, that generates data items
  • input queue (where producer writes to)
  • worker threads
  • output queue, where workers write to (may be MPSC)

The main thread will be reading from the output queue.

Question is: which package to use for these queues? MPSC won't work as input queue, because it has only one consumer. Crossbeam ArrayQueue seems to be what I need, but in the examples I see if the queue is full, the pushing thread receives Err (I thought it should just wait.), and I guess it needs some code to make it wait.

I found another project, deadqueue, but the sample code from README deadlocks and does nothing.

crossbeam-channel is the best. A bounded channel is equivalent to a queue with a backpressure. It doesn't come with a threadpool, you'll have to hook one up yourself, but because it's mpmc channel, it's as easy as spawning N threads and receiving on them.

7 Likes

Oh, somehow I oversaw this. Thanks a lot!

My test code that worked right away:

use std::thread;
use std::time::Duration;
use crossbeam_channel::bounded;

fn main() {
	let (s, r) = bounded(10);

	let mut threads:Vec<thread::JoinHandle<_>> = vec![];
	for i in 0..10 {
		let r2 = r.clone();
		threads.push(thread::spawn(move || {
			loop {
				let x = r2.recv().unwrap();
				println!("thread {} receieved {:?}", i, x);
				thread::sleep(Duration::from_secs(2));
				if x == -1 {
					break
				}
			}
			println!("thread {:?} ending", i);
		}));
	}

	for i in 0..30 {
		println!("sending {:?}", i);
		s.send(i).unwrap();
	}
	for _ in 0..10 { s.send(-1).unwrap(); }
	for th in threads { th.join().unwrap(); }
}

FYI, the normal way to shut down the thread is to simply drop the sender.

for i in 0..30 {
	println!("sending {:?}", i);
	s.send(i).unwrap();
}
drop(s);

At that point, the recv call on all receivers will return None. With your unwrap that would cause a panic, but you can do the following to simply exit the loop when it happens:

while let Some(x) = r2.recv() {
    println!("thread {} receieved {:?}", i, x);
    thread::sleep(Duration::from_secs(2));
}
2 Likes

Thanks! That's a lot easier than to send sentinels. Minor correction: recv().ok()

			while let Some(x) = r2.recv().ok() {
				println!("thread {} receieved {:?}", i, x);
				thread::sleep(Duration::from_secs(2));
			}

1 Like

Oh, if it returns a Result you can probably do

while let Ok(x) = r2.recv() {

to save a few characters.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.