When I use channel to send data from supportive thread to main thread, the function freezes up

I would like to create a function that would read a csv to columns and in this function I have s thread that takes a line of the csv file and gets the column type while the file is being read and I have the following problems:
when I send something from the handle to the main thread or use handle.join(), the function doesn't go further and return output, but just freezes up. How to send data from the supportive thread to the main thread without function being freezed up?

fn read_csv(path:&str)-> Result<(), csv::Error>{
    let mut input = csv::Reader::from_reader(BufReader::new(File::open(path).unwrap()));
    let mut data = HashMap::new();
    //let mut bitvecs = BitVec::new();
    let headers = input.headers()?.clone();
    let mut rows:usize = 0;
    let mut col_count = 0;
    let (num_cols_tx,num_cols_rx):(Sender<usize>, Receiver<usize>) = mpsc::channel();
    let (tx, rx): (Sender<csv::StringRecord>, Receiver<csv::StringRecord>) = mpsc::channel();
    let (dtype_send,dtype_rcv):(Sender<Vec<String>>, Receiver<Vec<String>>) = mpsc::channel();
    for (i,el) in headers.into_iter().enumerate(){
        col_count+=1;
        data.insert(i,Vec::new());
    }
    num_cols_tx.send(col_count).unwrap();


    let get_type_handle = thread::spawn(move||{
        let num_cols = col_count;
        //num_cols_rx.recv().unwrap();
        let mut dtypes = HashMap::new();
        let mut data_types = Vec::new();
        for i in 0..num_cols{
            data_types.push("".to_string());
            //float->integer->object->category->datetime
            dtypes.insert(i,vec![true,true,true,true,true]);
    
        }
        for recieved in rx{
            for i in 0..num_cols{
                let mut k = dtypes.entry(i).or_insert(Vec::new());
                let el = recieved.get(i).map(|s| s.to_string()).unwrap();
               // println!("{:?}",el);
                if !el.parse::<f64>().is_ok(){
                    k[1] = false;
                    k[0] = false;
                    k[3] = false
                }
                if !el.parse::<isize>().is_ok(){
                    k[0] = false;
                    k[1] = false;
                    k[4] = false;
                }
            }
        }
        for i in 0..num_cols{
            let mut k = dtypes.entry(i).or_insert(Vec::new());
            if k[0]==true&&k[1]==false{
                data_types[i] = "float64".to_string();}
            else if k[0]==true&&k[1]==true{
                    data_types[i] = "isize".to_string();
            }
            else if k[0]==false&&k[3]==false{
                data_types[i] = "object".to_string();
            }
            else{
                data_types[i] = "category".to_string();
            }
        }
        dtype_send.send(data_types);
    
    });



    for line in input.records(){
        rows += 1;
       // println!("{:?}",line);
        let line_un = line.unwrap();
        println!("{:?} {}",line_un,rows);
        tx.send(line_un.clone()).unwrap();
       // data.push(line?);
    for i in 0..col_count{
        let cur = line_un.get(i).map(|s| s.to_string());
        let mut k = data.entry(i).or_insert(Vec::new());
        k.push(cur); 
    }
}

get_type_handle.join().unwrap();
let data_types = dtype_rcv.recv().unwrap();
println!("{:?}",data_types);
   // return Ok(DataFrame::from_hash_map(&columns));
    Ok(())

}

From quick read your code seems blocked at for recieved in rx {. It will wait for the next send while any sender exists.

Drop the sender tx once you have done; the loop then finishes after outstanding receives complete.

1 Like

Thanks, I will try

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.