Crossbeam-channel: loop until no more values produced

Hi,

I am using crossbeam-channel as a multi-producer-multi-consumer-channel. I have a problem similar to this one:

extern crate crossbeam_channel;
extern crate threadpool;

use crossbeam_channel as channel;
use threadpool::ThreadPool;

const WORKER: usize = 4;

fn main() {
    let (s, r) = channel::unbounded();
    s.send(3);

    let pool = ThreadPool::new(WORKER);

    for _ in 0..WORKER {
        let (s, r) = (s.clone(), r.clone());
        pool.execute(move || loop {
            match r.recv() {
                Some(val) => {
                    println!("{}", val);
                    if val > 1 {
                        for _ in 0..val {
                            s.send(val - 1);
                        }
                    }
                }

                None => break,
            }
        });
    }

    pool.join();
}

This never finishes since all worker-threads block in r.recv. Basically I don’t have a way to predict when all workers stop producing new values, only that this eventually finishes. I guess I want my workers to terminate when all workers are blocked in recv. To solve this I am using a timeout and loop until the queue is empty:

#[macro_use]
extern crate crossbeam_channel;
extern crate threadpool;

use std::time::Duration;

use crossbeam_channel as channel;
use threadpool::ThreadPool;

const WORKER: usize = 4;

fn main() {
    let (s, r) = channel::unbounded();
    s.send(3);

    let pool = ThreadPool::new(WORKER);
    let timeout = Duration::from_millis(10);

    loop {
        for _ in 0..WORKER {
            let (s, r) = (s.clone(), r.clone());
            pool.execute(move || loop {
                select! {
                    recv(r, msg) => {
                        match msg {
                            Some(val) => {
                                println!("{}", val);
                                if val > 1 {
                                    for _ in 0..val {
                                        s.send(val - 1);
                                    }
                                }
                            }

                            None => break,
                        }
                    }

                    recv(channel::after(timeout)) => break,
                }
            });
        }

        pool.join();

        match r.try_recv() {
            Some(val) => {
                s.send(val);
            }

            None => break,
        }
    }

}

I am pretty sure that there is a better and more idiomatic way to solve this. How would you solve this?

1 Like

The typical way to solve such problems is to drop the sender once they are no longer needed.

This is not applicable in this case, because there are cycles between workers: each worker owns the sender and receiver halves of the same channel. Could you perhaps rearrange the code such that this is not the case?

Alternatively, this looks very much like a work-stealing problem, so perhaps rayon will be a better abstraction then crossbeam-channel?

One possible direct solution to the problem is to track the number of in-progress tasks and explicitly initiate a shutdown from the thread that finishes the last task:

https://play.rust-lang.org/?version=stable&mode=debug&edition=2015&gist=72eccbe321279243cdb8124e9cf76f16

Similar to @matklad’s suggestion, I’d recommend sending an explicit message indicating termination. So, instead of sending a plain i32, consider sending something like:

 enum Msg {
    Data(i32),
    Terminate,
}

Then, whoever is generating messages and knows when things are over can turn off the lights by sending Msg::Terminate.

Thanks for your answers! It really seems that crossbeam-channel might not be the best crate for this use case. I am now looking into crossbeam-deque which seems to fit better for my use case.