Hi,
I try to parallelize writing a CSV. Basically I have an input file, do some lookups and calculations and write the results into a new CSV. As the csv crate reads CSVs row by row I had the naive idea to parallelize the following way:
- create n threads
- in each thread read the csv line by line in parallel
- each CSV receives a subset of the data (only doing calculations there, ignoring the rest)
- each thread stores the subset in a separate CSV (for full speed)
The problem is, that all my subset CSVs have the exact same calculation output (which should not be the case). The full code is here.
The critical piece is the write_subset_to_csv function or the threading it is embedded into. In the output files I also place the row index so all subset CSVs could easily be merged and sorted afterwards into the original format again. The row index is separate in all subset CSVs as it should be. Just the calculation result is similar.
I am not sure if for some reason the CSV is not read separately or what is going wrong.
The critical pieces:
let range: Vec = (0..available_cores).collect();
std::thread::scope(|s| {
for thread_idx in range {
let f_name = filename.clone();
let groupby_col = env::args().nth(1).expect("groupby_col not provided");
let count_col = env::args().nth(2).expect("count_col not provided");
let result_filename = env::args().nth(4).expect("result file_name not provided");
threads.push(s.spawn({
let counts = &counts;
let col_indices = &col_indices;
let stds = &stds;
move || {
write_subset_to_csv(
&f_name,
&groupby_col,
&count_col,
thread_idx,
available_cores,
counts,
col_indices,
stds,
&result_filename
)
}
}))
}});
and:
fn write_subset_to_csv(filename: &str, groupby_col: &String, count_col: &String, cpu_core: u32, total_cores: u32,
counts: &HashMap<String, (i32, Decimal)>, col_indices: &HashMap<String, usize>, stds: &HashMap<String, f64>,
result_filename: &str) {
let subset_filename: String = format!("{}_{}.csv", result_filename.clone(), cpu_core.clone());
// create results csv with header only
write_to_file_header(&subset_filename, "index".to_string(), &groupby_col, (&count_col).to_string());
// Iterate 3rd time through rows to calculate zscores on the fly and export into results csv
let file = File::open(filename).expect("Could not open file");
let reader = BufReader::new(file);
let mut lines = reader.lines();
// get headers so row counts matches the one of our first loop through csvs
let header_row = lines.next().unwrap().unwrap();
let _headers: Vec<&str> = header_row.split(',').collect();
let mut file = OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open(&subset_filename)
.unwrap();
let mut writer = csv::Writer::from_writer(file);
let mut row_idx: u32 = cpu_core;
for line in lines {
match row_idx % total_cores == cpu_core { // every core handles a different subset of data
true => {
let record = line.unwrap();
let record: Vec<&str> = record.split(',').collect();
let group_val = record[*col_indices.get(groupby_col).unwrap()].to_string();
let col_val = Decimal::from_str(record[*col_indices.get(count_col).unwrap()]).unwrap_or_else(|_| Decimal::new(0, 0));
let group_hash = counts.get(&group_val);
let mut zscore: f64 = 0.0;
match group_hash {
Some(value) => {
// calculate total of deltas from individual values to group mean
let sum: f64 = value.1.to_f64().unwrap();
let mean = sum / value.0 as f64;
let std = stds.get(&group_val);
zscore = (col_val.to_f64().unwrap() - mean) / std.unwrap();
let zscore_str: String = zscore.to_string();
writer.write_record(&[
row_idx.to_string(),
groupby_col.to_string(),
count_col.to_string(),
zscore_str,
]);
(here truncated then)
What could be the root cause for such behaviour?