I've to make a cartesian product between two datasets computing some statistical properties (which are not important for this example). I'm using the entire workflow with iterators (function all_vs_all_with_iterators
) except the reading of the CSV (function get_df
), here is the working code:
use std::{cmp::Ordering};
use csv::ReaderBuilder;
use itertools::iproduct;
use rgsl::{randist::beta::beta_P, statistics::correlation};
use serde::{Serialize, Deserialize};
extern crate external_sort;
use external_sort::{ExternalSorter, ExternallySortable};
use std::time::Instant;
type Matrix = Vec<Vec<f64>>;
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
struct CorResult {
r: f64,
p_value: f64,
p_value_adjusted: Option<f64>
}
impl Eq for CorResult { }
impl Ord for CorResult {
// Sorts in descending order
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(&other).unwrap()
}
}
impl PartialOrd for CorResult {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.p_value.partial_cmp(&other.p_value)
}
}
impl ExternallySortable for CorResult {
fn get_size(&self) -> u64 { 1 }
}
fn all_vs_all_with_iterators(m1: MatrixWithIterators, m3: MatrixWithIterators) {
let n = m1[0].len();
let total_number_of_elements = (m1.count() * m3.count()) as u64;
let correlations_and_p_values = iproduct!(m1, m3).map(|(tuple1, tuple3)| {
// Correlation
let r = 10; // Removed complex computation
// P-value
let p_value = 5; // Removed complex computation
CorResult{r, p_value, p_value_adjusted: None}
});
// Sorting
let external_sorter: ExternalSorter<CorResult> = ExternalSorter::new(total_number_of_elements, None);
let sorted = external_sorter.sort(correlations_and_p_values).unwrap();
// Ranking
let ranked = sorted.enumerate();
// Filtering
let correlation_threhold = 0.7;
let filtered = ranked.filter(|(_, cor_and_p_value)| cor_and_p_value.as_ref().unwrap().r.abs() >= correlation_threhold);
// Adjustment
let mut previous_value = 999999.0;
let adjusted = filtered.map(|(rank, value)| {
// Some stuff with previous_value
value
});
println!("Final count -> {}", adjusted.count());
}
fn get_df(path: &str) -> Matrix {
// Build the CSV reader and iterate over each record.
let mut rdr = ReaderBuilder::new()
.delimiter(b'\t')
.from_path(path).unwrap();
rdr.records().map(|result| {
let record = result.unwrap();
record.iter().map(|x| x.parse::<f64>().expect(x)).collect::<Vec<f64>>()
}).collect()
}
fn main() {
let m1 = get_df("df1.csv");
let m3 = get_df("df2.csv");
all_vs_all(m1, m3);
}
The problem is the program is consuming too much memory (my real datasets could be of thousands of rows with hundred of columns), so I was trying to read both datasets lazily but iproduct needs the trait Clone
:
let rdr = ReaderBuilder::new()
.delimiter(b'\t')
.from_path(path).unwrap();
let m1 = rdr.into_records().map(|row| row.unwrap().iter().map(|x| x.parse::<f64>().expect(x)));
But in iproduct it's throwing:
the trait
std::clone::Clone
is not implemented forcsv::StringRecordsIntoIter<std::fs::File>
To replicate, these are my dependencies:
itertools = "0.9.0"
GSL = "2.0.1"
external_sort = "^0.1.1"
serde = { version = "1.0", features = ["derive"] }
csv = "1.1"
And both testing dataset can be found here
Is there a way to use iproduct
with lazy read rows? Any kind of help would be really appreciated