Hi folks! I'm a Rust noob trying to build something. For last 3-4 days I'm facing an issue with my code where the program doesn't terminate and haven't been able to understand why. Hence, posting here for help ![]()
Basically, I'm trying to process a set of files using Channels on a ThreadPool created using rayon (below is a snippet of core logic). This code doesn't seem to terminate.
I've tried multiple things with no luck. Any idea what I could be doing wrong? I might be missing something obvious here but not sure
(Code snippet is below)
Things tried:
- Removed thread pools and moved to
thread::spawnand joining those handles, didn't help. - Was initially using
receiver.iter()instead ofreceiver.recv()and thought that would help, but didn't. - Turns out the program terminates when I use
receiver.try_recv(). But, from what I understood from documentationtry_recv()doesn't wait but I would like the receiver thread to wait until sender is dropped. - Removed pool completely and just processing serially on
receiver_thread, that doesn't work too. So, this is when I realised I'm doing some basic blunder.
Also, Any tips on how can I debug this better?
- VS code debugging shows my threads are still running, but can't understand why or for what. The stack frame code is in assembly
- Tried using
FlameGraphto visualize where it's stuck in execution, but doesn't run on Mac (dtrace: failed to initialize dtrace)
Thanks in advance!
Code snippet:
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::{mpsc, Arc, Mutex};
use std::*;
fn parallel_compute() -> Result<(), Box<dyn error::Error>> {
let (sender, receiver) = mpsc::channel();
let map: Arc<Mutex<HashMap<u32, u32>>> = Arc::new(Mutex::new(HashMap::new()));
let pool = rayon::ThreadPoolBuilder::new().build()?;
sender.send(1u32)?;
let map_clone = map.clone();
let sender_clone = sender.clone();
let reeiver_thread = thread::spawn(move || {
while let Ok(id) = receiver.recv() {
let sender_inner_clone = sender_clone.clone();
let map_inner_clone = map_clone.clone();
pool.spawn(move || {
println!("Processing {}", id);
map_inner_clone.lock().unwrap().insert(id, id);
if id < 5 {
thread::sleep(time::Duration::from_secs(1));
sender_inner_clone.send(id + 1).unwrap();
}
});
}
drop(pool);
});
dbg!(drop(sender));
reeiver_thread.join().unwrap();
println!("Done processing");
Ok(())
}
fn main() {
parallel_compute().unwrap();
}