I would like to create a function that would read a csv to columns and in this function I have s thread that takes a line of the csv file and gets the column type while the file is being read and I have the following problems:
when I send something from the handle to the main thread or use handle.join(), the function doesn't go further and return output, but just freezes up. How to send data from the supportive thread to the main thread without function being freezed up?
fn read_csv(path:&str)-> Result<(), csv::Error>{
let mut input = csv::Reader::from_reader(BufReader::new(File::open(path).unwrap()));
let mut data = HashMap::new();
//let mut bitvecs = BitVec::new();
let headers = input.headers()?.clone();
let mut rows:usize = 0;
let mut col_count = 0;
let (num_cols_tx,num_cols_rx):(Sender<usize>, Receiver<usize>) = mpsc::channel();
let (tx, rx): (Sender<csv::StringRecord>, Receiver<csv::StringRecord>) = mpsc::channel();
let (dtype_send,dtype_rcv):(Sender<Vec<String>>, Receiver<Vec<String>>) = mpsc::channel();
for (i,el) in headers.into_iter().enumerate(){
col_count+=1;
data.insert(i,Vec::new());
}
num_cols_tx.send(col_count).unwrap();
let get_type_handle = thread::spawn(move||{
let num_cols = col_count;
//num_cols_rx.recv().unwrap();
let mut dtypes = HashMap::new();
let mut data_types = Vec::new();
for i in 0..num_cols{
data_types.push("".to_string());
//float->integer->object->category->datetime
dtypes.insert(i,vec![true,true,true,true,true]);
}
for recieved in rx{
for i in 0..num_cols{
let mut k = dtypes.entry(i).or_insert(Vec::new());
let el = recieved.get(i).map(|s| s.to_string()).unwrap();
// println!("{:?}",el);
if !el.parse::<f64>().is_ok(){
k[1] = false;
k[0] = false;
k[3] = false
}
if !el.parse::<isize>().is_ok(){
k[0] = false;
k[1] = false;
k[4] = false;
}
}
}
for i in 0..num_cols{
let mut k = dtypes.entry(i).or_insert(Vec::new());
if k[0]==true&&k[1]==false{
data_types[i] = "float64".to_string();}
else if k[0]==true&&k[1]==true{
data_types[i] = "isize".to_string();
}
else if k[0]==false&&k[3]==false{
data_types[i] = "object".to_string();
}
else{
data_types[i] = "category".to_string();
}
}
dtype_send.send(data_types);
});
for line in input.records(){
rows += 1;
// println!("{:?}",line);
let line_un = line.unwrap();
println!("{:?} {}",line_un,rows);
tx.send(line_un.clone()).unwrap();
// data.push(line?);
for i in 0..col_count{
let cur = line_un.get(i).map(|s| s.to_string());
let mut k = data.entry(i).or_insert(Vec::new());
k.push(cur);
}
}
get_type_handle.join().unwrap();
let data_types = dtype_rcv.recv().unwrap();
println!("{:?}",data_types);
// return Ok(DataFrame::from_hash_map(&columns));
Ok(())
}