Iterators over CSV files with iproduct

I've to make a cartesian product between two datasets computing some statistical properties (which are not important for this example). I'm using the entire workflow with iterators (function all_vs_all_with_iterators) except the reading of the CSV (function get_df), here is the working code:

use std::{cmp::Ordering};
use csv::ReaderBuilder;
use itertools::iproduct;
use rgsl::{randist::beta::beta_P, statistics::correlation};
use serde::{Serialize, Deserialize};
extern crate external_sort;
use external_sort::{ExternalSorter, ExternallySortable};
use std::time::Instant;

type Matrix = Vec<Vec<f64>>;

#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
struct CorResult {
    r: f64,
    p_value: f64,
    p_value_adjusted: Option<f64>
}

impl Eq for CorResult { }

impl Ord for CorResult {
    // Sorts in descending order
    fn cmp(&self, other: &Self) -> Ordering {
        self.partial_cmp(&other).unwrap()
    }
}

impl PartialOrd for CorResult {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        self.p_value.partial_cmp(&other.p_value)
    }
}

impl ExternallySortable for CorResult {
    fn get_size(&self) -> u64 { 1 }
}

fn all_vs_all_with_iterators(m1: MatrixWithIterators, m3: MatrixWithIterators) {
    let n = m1[0].len();
    
    let total_number_of_elements = (m1.count() * m3.count()) as u64;

    let correlations_and_p_values = iproduct!(m1, m3).map(|(tuple1, tuple3)| {        
        // Correlation
        let r =  10; // Removed complex computation

        // P-value
        let p_value = 5; // Removed complex computation
        CorResult{r, p_value, p_value_adjusted: None}
    });

    // Sorting
    let external_sorter: ExternalSorter<CorResult> = ExternalSorter::new(total_number_of_elements, None);
    let sorted = external_sorter.sort(correlations_and_p_values).unwrap();

    // Ranking 
    let ranked = sorted.enumerate();

    // Filtering
    let correlation_threhold = 0.7;
    let filtered = ranked.filter(|(_, cor_and_p_value)| cor_and_p_value.as_ref().unwrap().r.abs() >= correlation_threhold);
    
    // Adjustment
    let mut previous_value = 999999.0;
    let adjusted = filtered.map(|(rank, value)| {
		// Some stuff with previous_value
        value
    });

    println!("Final count -> {}", adjusted.count());
}

fn get_df(path: &str) -> Matrix {
    // Build the CSV reader and iterate over each record.
    let mut rdr =  ReaderBuilder::new()
        .delimiter(b'\t')
        .from_path(path).unwrap();
    
    rdr.records().map(|result| {
        let record = result.unwrap();
        record.iter().map(|x| x.parse::<f64>().expect(x)).collect::<Vec<f64>>()
    }).collect()
}

fn main() {
    let m1 = get_df("df1.csv");
    let m3 = get_df("df2.csv");
    
    all_vs_all(m1, m3);
}

The problem is the program is consuming too much memory (my real datasets could be of thousands of rows with hundred of columns), so I was trying to read both datasets lazily but iproduct needs the trait Clone:

let rdr = ReaderBuilder::new()
    .delimiter(b'\t')
    .from_path(path).unwrap();
    
let m1 = rdr.into_records().map(|row| row.unwrap().iter().map(|x| x.parse::<f64>().expect(x)));

But in iproduct it's throwing:

the trait std::clone::Clone is not implemented for csv::StringRecordsIntoIter<std::fs::File>

To replicate, these are my dependencies:

itertools = "0.9.0"
GSL = "2.0.1"
external_sort = "^0.1.1"
serde = { version = "1.0", features = ["derive"] }
csv = "1.1"

And both testing dataset can be found here

Is there a way to use iproduct with lazy read rows? Any kind of help would be really appreciated

1 Like

The cartesian product of two iterators of length n will have n2 items: At least one of the iterators needs to get restarted n times, which is what the Clone bound on the iterator is for. The items themselves also need to implement Clone so that a single result can be merged with multiple results from the other iterator.

File-based iterators can't be cloned because the underlying File can't be cloned. (And it looks like the csv crate never implements Clone for its iterators, regardless of the underlying storage).
The cartesian_product method that iproduct! uses only requires the iterator to be cloneable for the right side, so the first thing I'd try is to make it half-lazy: collect the right-hand dataset into a Vec but treat the left-hand one lazily.

Alternatively, you could try to wrap csv's iterator with something that implements clone in a novel way. For instance, you could try something like this: (pseudocode)

struct IterWrapper {
    filename: String,
    inner: csv::StringRecordsIntoIter<std::fs::File>,
}

impl Iterator for IterWrapper {
    type Item = /* ... */;
    fn next(&mut self)->Option<_> {
        self.inner.next()
    }
}

impl Clone for IterWrapper {
    fn clone(&self) -> Self {
        let mut new_inner = /* Open a new copy of the file and setup the iterator */;
        new_inner.reader_mut().seek(self.inner.reader().position());
        IterWrapper {
            filename: self.filename.clone(),
            inner: new_inner
        }
    }
}
2 Likes

You're the best! Thank you so much! I've no idea about that only the right-side param needs to be clonable.
In the case of the implementation of Clone for my custom struct, why I should make a copy of the file instead of just read the file again? As it's only for reading you think that It'll lock the entire file?

"Read the file again" is probably closer to what I had in mind. You need to leave the original File object undisturbed so that the old iterator continues to work properly (without skipping or repeating any entries), so your clone implementation needs to call File::open() again.

Or, use something like the memmap crate and use that mapping to feed the csv crate instead of File objects. (I've never worked with memory mapped I/O in Rust, so I don't know how easy/hard this would be).

1 Like