Mpsc/crossbeam Channel doesn't terminate, need some help to understand why

Hi folks! I'm a Rust noob trying to build something. For last 3-4 days I'm facing an issue with my code where the program doesn't terminate and haven't been able to understand why. Hence, posting here for help :slightly_smiling_face:

Basically, I'm trying to process a set of files using Channels on a ThreadPool created using rayon (below is a snippet of core logic). This code doesn't seem to terminate.

I've tried multiple things with no luck. Any idea what I could be doing wrong? I might be missing something obvious here but not sure :frowning: (Code snippet is below)

Things tried:

  1. Removed thread pools and moved to thread::spawn and joining those handles, didn't help.
  2. Was initially using receiver.iter() instead of receiver.recv() and thought that would help, but didn't.
  3. Turns out the program terminates when I use receiver.try_recv() . But, from what I understood from documentation try_recv() doesn't wait but I would like the receiver thread to wait until sender is dropped.
  4. Removed pool completely and just processing serially on receiver_thread , that doesn't work too. So, this is when I realised I'm doing some basic blunder.

Also, Any tips on how can I debug this better?

  1. VS code debugging shows my threads are still running, but can't understand why or for what. The stack frame code is in assembly :sweat_smile:
  2. Tried using FlameGraph to visualize where it's stuck in execution, but doesn't run on Mac (dtrace: failed to initialize dtrace)

Thanks in advance!

Rust playground

Code snippet:

use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::{mpsc, Arc, Mutex};
use std::*;

fn parallel_compute() -> Result<(), Box<dyn error::Error>> {
    let (sender, receiver) = mpsc::channel();
    let map: Arc<Mutex<HashMap<u32, u32>>> = Arc::new(Mutex::new(HashMap::new()));

    let pool = rayon::ThreadPoolBuilder::new().build()?;
    sender.send(1u32)?;

    let map_clone = map.clone();
    let sender_clone = sender.clone();
    let reeiver_thread = thread::spawn(move || {
        while let Ok(id) = receiver.recv() {
            let sender_inner_clone = sender_clone.clone();
            let map_inner_clone = map_clone.clone();
            pool.spawn(move || {
                println!("Processing {}", id);
                map_inner_clone.lock().unwrap().insert(id, id);
                if id < 5 {
                    thread::sleep(time::Duration::from_secs(1));
                    sender_inner_clone.send(id + 1).unwrap();
                }
            });
        }
        drop(pool);
    });

    dbg!(drop(sender));
    reeiver_thread.join().unwrap();
    println!("Done processing");
    Ok(())
}
fn main() {
    parallel_compute().unwrap();
}

The problem is that sender_clone lives to the end of reciver_thread.
reciver_thread does not end as receiver.recv() blocks the current thread if there is at least one sender left.

One of the workarounds is to use recv_timeout instead of recv.

playground

1 Like

Thank you that fixes it!

Out of curiosity, is there a good design pattern or a better way to handle this sort of problem rather than setting a timeout? :slight_smile:

You could have each worker spawn further work directly -- then they're the only ones that keep a sender, and the receiver will be unblocked when they're finished. You'll need a work function with explicit args though, because closures can't be recursive or self-referential.

If you use scope for spawning, you can also borrow &Mutex from outside the scope, instead of needing Arc clones everywhere. The scope won't return until all associated spawns are complete.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.