The function below uses par_apply to set the value of mutable arrays using shared access to vec_of_closes, vec_of_lows, vec_of_highs. However, i am getting funny data back. This compiles, but clearly something is wrong.
Should those sections be put in a mutex? Any other suggestions?
Grateful in advance.
fn sampling_threaded_hlc(vec_of_closes: &[f64],vec_of_highs: &[f64],vec_of_lows: &[f64], vec_of_dates: &[&str], vec_of_times: &[&str]) -> DataFrame {
let steps = vec_of_times.len() - 1;
let mut lows = Array1::<f64>::zeros(steps);
let mut highs = Array1::<f64>::zeros(steps);
let mut closes = Array1::<f64>::zeros(steps);
let back_times = &vec_of_times[..vec_of_times.len()-1];
let front_times = &vec_of_times[1..vec_of_times.len()];
let first_back = back_times[0];
let last_front = front_times[front_times.len()-1];
Zip::from(&mut closes).and(&mut lows).and(&mut highs).and(&*back_times).and(&*front_times).par_apply(|closes,lows,highs,back_times,front_times| {
// println!("{:?},{:?}",back_times,front_times);
let first = if *back_times == first_back {
0
}
else {
vec_of_dates.par_iter().position_first(|x| NaiveDateTime::parse_from_str(&x,"%Y-%m-%d %H:%M:%S").unwrap() > NaiveDateTime::parse_from_str(&back_times,"%Y-%m-%d %H:%M:%S").unwrap()).unwrap()
};
let last = if *front_times == last_front {
steps
}
else {
vec_of_dates.par_iter().position_first(|x| NaiveDateTime::parse_from_str(&x,"%Y-%m-%d %H:%M:%S").unwrap() > NaiveDateTime::parse_from_str(&front_times,"%Y-%m-%d %H:%M:%S").unwrap()).unwrap()
};
*lows = vec_of_lows[first..last].iter().fold(f64::INFINITY, |a,&b| a.min(b));
*highs = vec_of_highs[first..last].iter().fold(0_f64, |a,&b| a.max(b));
*closes = vec_of_closes[first..last].last().copied().unwrap();
println!("{:?},{:?},{:?},{:?},{:?},{:?},{:?}",back_times,front_times,last,first,lows,highs,closes);
});
let vec_dates = Series::new("date",&vec_of_times[1..]);
let vec_lows = Series::new("low",fill_forward(&lows.to_vec()));
let vec_highs = Series::new("high",fill_forward(&highs.to_vec()));
let vec_closes = Series::new("close",fill_forward(&closes.to_vec()));
let df = DataFrame::new(vec![vec_dates,vec_lows,vec_highs,vec_closes]).unwrap();
println!("{:?}",df.head(Some(10)));
df
}