Parallelize writing CSVs yields same output in all files

Hi,

I try to parallelize writing a CSV. Basically I have an input file, do some lookups and calculations and write the results into a new CSV. As the csv crate reads CSVs row by row I had the naive idea to parallelize the following way:

  • create n threads
  • in each thread read the csv line by line in parallel
  • each CSV receives a subset of the data (only doing calculations there, ignoring the rest)
  • each thread stores the subset in a separate CSV (for full speed)

The problem is, that all my subset CSVs have the exact same calculation output (which should not be the case). The full code is here.

The critical piece is the write_subset_to_csv function or the threading it is embedded into. In the output files I also place the row index so all subset CSVs could easily be merged and sorted afterwards into the original format again. The row index is separate in all subset CSVs as it should be. Just the calculation result is similar.

I am not sure if for some reason the CSV is not read separately or what is going wrong.

The critical pieces:

let range: Vec = (0..available_cores).collect();

std::thread::scope(|s| {
for thread_idx in range {
    let f_name = filename.clone();
    let groupby_col = env::args().nth(1).expect("groupby_col not provided");
    let count_col = env::args().nth(2).expect("count_col not provided");
    let result_filename = env::args().nth(4).expect("result file_name not provided");

    threads.push(s.spawn({
        let counts = &counts;
        let col_indices = &col_indices;
        let stds = &stds;
        move || {
            write_subset_to_csv(
                &f_name,
                &groupby_col,
                &count_col,
                thread_idx,
                available_cores,
                counts,
                col_indices,
                stds,
                &result_filename
            )
        }
    }))
    }});

and:

fn write_subset_to_csv(filename: &str, groupby_col: &String, count_col: &String, cpu_core: u32, total_cores: u32,
                       counts: &HashMap<String, (i32, Decimal)>, col_indices: &HashMap<String, usize>, stds: &HashMap<String, f64>,
                       result_filename: &str) {
let subset_filename: String =  format!("{}_{}.csv", result_filename.clone(), cpu_core.clone());
// create results csv with header only
write_to_file_header(&subset_filename, "index".to_string(), &groupby_col, (&count_col).to_string());

// Iterate 3rd time through rows to calculate zscores on the fly and export into results csv
let file = File::open(filename).expect("Could not open file");
let reader = BufReader::new(file);
let mut lines = reader.lines();
// get headers so row counts matches the one of our first loop through csvs
let header_row = lines.next().unwrap().unwrap();
let _headers: Vec<&str> = header_row.split(',').collect();

let mut file = OpenOptions::new()
    .write(true)
    .create(true)
    .append(true)
    .open(&subset_filename)
    .unwrap();

let mut writer = csv::Writer::from_writer(file);
let mut row_idx: u32 = cpu_core;

for line in lines {
        match row_idx % total_cores == cpu_core { // every core handles a different subset of data
            true => {
                let record = line.unwrap();
                let record: Vec<&str> = record.split(',').collect();
                let group_val = record[*col_indices.get(groupby_col).unwrap()].to_string();
                let col_val = Decimal::from_str(record[*col_indices.get(count_col).unwrap()]).unwrap_or_else(|_| Decimal::new(0, 0));

                let group_hash = counts.get(&group_val);
                let mut zscore: f64 = 0.0;
                match group_hash {
                    Some(value) => {
                        // calculate total of deltas from individual values to group mean
                        let sum: f64 = value.1.to_f64().unwrap();
                        let mean = sum / value.0 as f64;
                        let std = stds.get(&group_val);
                        zscore = (col_val.to_f64().unwrap() - mean) / std.unwrap();
                        let zscore_str: String = zscore.to_string();
                        writer.write_record(&[
                            row_idx.to_string(),
                            groupby_col.to_string(),
                            count_col.to_string(),
                            zscore_str,
                        ]);

(here truncated then)

What could be the root cause for such behaviour?

You shouldn't be opening the same file from multiple threads. If you do this without locking, you will get mixed-up garbage. If you do this with locking (including file-system level locks), you will get single-threaded performance.

In your case result_filename is shared by all threads, so they all fight for this file. append does not guarantee graceful behavior.

Write to separate Vecs and then write the vecs serially to the file.

3 Likes

Could you examine why opening the same file in multiple threads does not work?

I had the naive imagination that I can:

  • either open the file separately in multple threads
  • or pass the opened row into multiple threads

Are both options impossible/not recommended? Is there any good way to deal with a CSV multi-threaded?

I tried to read the CSV sequentially, but this solution is super slow unfortunately...

let range: Vec = (0..available_cores).collect();

let mut row_idx: u32 = 0;
for line in lines {
    let record = line.unwrap();
    let record: Vec<&str> = record.split(',').collect();

    std::thread::scope(|s| {

    for thread_idx in &range {
        let f_name = filename.clone();
        let groupby_col = env::args().nth(1).expect("groupby_col not provided");
        let count_col = env::args().nth(2).expect("count_col not provided");
        let result_filename = env::args().nth(4).expect("result file_name not provided");

        s.spawn({
            let counts = &counts;
            let col_indices = &col_indices;
            let stds = &stds;
            let record = &record;
            move || {
                write_subset_to_csv(
                    &f_name,
                    &groupby_col,
                    &count_col,
                    &row_idx,
                    record,
                    *thread_idx,
                    available_cores,
                    counts,
                    col_indices,
                    stds,
                    &f_name
                )
            }
        });


                }
                });
        row_idx += 1;
        };

I wonder if there would be any hope in reading sequentially and spawning a thread for each row.. However here I would have to limit the number of spwaned threads being in the pool at any time I guess...

This is because every writer thinks it has exclusive access to the file and never cares about anyone else writing at the same time. Multiple writers will overwrite/mix/jumble the file data, because they do not cooperate. This is bad for data integrity, and it's bad for performance (because the OS still does some locking, but at a level when it only causes performance issues, but it's too late to prevent data integrity issues).

This is a very fragile bad design, and you must avoid doing this. Only ever have one writer writing to the file.

In Rust you can treat &mut Vec<u8> the same was as a File, because they both implement io::Write. Therefore, write to &mut Vec<u8> from threads (one for each thread, not shared!)

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.