Parallelize a working linear code

I have a working code, the main part of it is given below. I want to run the for loop on path_list in parallel. It works in parallel in python. The problem is in rust I don't know the type of the varible doublebuffer which is supposed to be a collection of tuples. Does anybody know how to do this using rayon and par_iter()? Will it be better to use hashmap?

for filename in path_list {
    let mut doublebuffer = fitsread(filename).expect("some error in for loop");
    }

fn fitsread(filename: std::path::PathBuf) -> Result<(f64, f64), Error> {
    let mut fptr = FitsFile::open(filename)?;
    let hdu = fptr.hdu(0)?;
    let mjd: f64 = fptr.hdu(0)?.read_key(&mut fptr, "MJD-OBS")?;
    let index=45.5;  // after some calculations on data of fits file named filename  
    Ok((mjd, index))
}

The type of doublebuffer is (f64, f64). That is clear from the type signature of fitsread().

It is usually just a case of calling par_iter() (or a similar method for the ownership and mutablity you want, such as into_par_iter() or par_iter_mut()), then calling iterator methods on it (ParallelIterator has most of the same methods as Iterator). Of course, you can't use for loops with ParallelIterator, but you can call for_each() with a closure accepting filename.

Both Vec and HashMap have par_iter() and the interface is much the same for both, so the answer to this is likely to depend on what you are doing with it elsewhere in your code.

Almost exactly for cases like this I wrote dpc_pariter - Rust . In particular if your path_list is an iterator returned by something like jwalk or ignore or walkdir, it will allow you to start processing these paths in parallel, without ever having to collect all of them (and thus having to wait for fs walker to complete).

1 Like

Yes, currently the type of doublebuffer is (f64, f64). But after converting it into parallel code it will collect multiple (f64, f64) values. So the following code does not work. Any solution?

    let mut ans=path_list.par_iter().map(|&filename| fitsread(filename)).collect::<(f64,f64)>().expect("some error");

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=3cf7a2ea416d8901263890d70f5b6028

Soimething like this, and then after you get it working change map to parallel_map from dpc-pariter

That is somewhat different to the code in your original post, where you aren't collecting the results. The two main things needed to make it work are that you need to collect into a collection type like Vec<(f64, f64)>, and you need to decide where you want to handle the Result. If you want to unwrap after every call to fitsread() like in the first post, then that would be:

let mut ans=path_list.par_iter().map(|&filename| fitsread(filename).expect("some error")).collect::<Vec<(f64,f64)>>();

If you want to return a Result from the whole thing and then expect() it, then you need to collect into a Result:

let mut ans=path_list.par_iter().map(|&filename| fitsread(filename)).collect::<Result<Vec<(f64,f64)>, Error>>().expect("some error");

The iterator combinators work similarly for the sequential case.

One issue you may run into is with the ownership of filename. fitsread() should either take a &Path, since you probably don't need to take ownership, or you should consume the list with into_par_iter() to get owned PathBufs.

1 Like

After a few tweaks, your parallel code worked like a charm. Thanks a lot. I am getting a 20% speed gain on using rust release as opposed to python for the similar parallel code. I was expecting better.
This may be because rust release code only uses around 40% CPU on all threads. Does anybody know why this may be happening? Python parallel code and rust code without --release uses 100% CPU but are slow.

I have not studied the presented code closely but it seems to me that no matter how much you paralelnize the code each parallel thread is hammering on the same underlying file system to read data. The bottleneck is the file system and media the files are on rather than your parallel code.

That you say "rust release code only uses around 40% CPU on all threads." hints to me that all those threads are waiting for the file access.

2 Likes

Seems like you are right about waiting for the file access. I used nvme SSD for the above-mentioned findings. But when I ran parallel python code on HDD, It shows only 20% CPU usage on all threads and runs very very slowly.

Is there any approach for the efficient reading of the data from files? During the rust release run, swap gradually increases to 2.5gb.

Wow, how many godzilla bytes are these files you are reading?

If you want to avoid using swap, which is probably a good idea, then you should read and process small chunks at a time.

Trying to parallelise this reading will require memory, hence swap space. So now you are hitting the disk to read the file and hitting the same disk to stash the memory it occupies into swap space.

1 Like

Limiting the amount of parallelism you do will actually often help. If you're reading 100 files at a time, the filesystem will try not to starve any of them, and thus will jump around all over to load them -- which will go particularly poorly on a spinning-platter HDD. But reading a single file from start to end without interruption is what they're best at.

You might try serially walking the directory and fs::reading the files, then spinning up a thread per file to process the file in parallel while you keep reading more from disk serially.

(If you have lots of files or the work per file is heavy enough, then you'd want something smarter than just thread-per-file. But since you said you're not saturating CPU right now anyway, the basic thread-per-file approach is easy to start with -- you can always use something fancier like a rayon::scope later if needed.)

2 Likes

I am reading 350 gb of fits files. I have 16 gb ram but only 30% is used in total while running the code.

I think it is 1 file (reading and processing) per thread. It frees the memory of file data after calculation of every file. I have attached the code. length of vector Xall, Yall, flux and wave_vac is 282624. There are 34550 fits files totalling 350 gb. I am running the rust release code on nvme SSD. I ran the code after reducing the sizes of Xall and Yall (by 1/6) by just using a subset of flux and wave_vec. But it did not improve the speed and did not reduce swap usage.

use fitsio::{errors::Error, FitsFile};
extern crate glob;
use glob::glob;
use rayon::prelude::*;

fn main() {
    let mut path_list: Vec<_> = Vec::new();
    for entry in glob("**/*S1D_A.fits").expect("Failed to read glob pattern") {
        match entry {
            Ok(path) => path_list.push(path),
            Err(e) => println!("{:?}", e),
        };
    }
  let ans = path_list
        .into_par_iter()
        .map(fitsread)
        .collect::<Result<Vec<(f64, f64)>, Error>>()
        .expect("some error");
    println!("{:?}", ans);
}


fn trapInt(Xdata: Vec<f64>, Ydata: Vec<f64>) -> f64 {
    let _total = 0.0;
    let width=Xdata[2] - Xdata[1];
    (0..(Xdata.len() - 1))
        .into_iter()
        .map(|i| width * (Ydata[i + 1] + Ydata[i]) / 2.0)
        .sum()
}

fn fitsread(filename: std::path::PathBuf) -> Result<(f64, f64), Error> {
    let mut fptr = FitsFile::open(filename)?;
    let _hdu = fptr.hdu(0)?;
    let mjd: f64 = fptr.hdu(0)?.read_key(&mut fptr, "MJD-OBS")?;
    let hdu1 = fptr.hdu(1)?;
    let flux: Vec<f64> = hdu1.read_col(&mut fptr, "flux")?;
    let wave_vac: Vec<f64> = hdu1.read_col(&mut fptr, "wavelength")?;


    let _rv = -1.6;//need to change later
    let wave_air = wave_vac;
    let Xall=/*(abs(rv)*1000.0/c1)*wave_air+*/wave_air;
    let Yall = flux; //pixel_size_s2d;
    let _length_Xall = Xall.len();
    let mut Xlow = 0.0;
    let mut Xhigh = 0.0;
    let mut Xlow_index = 0;
    let mut Xhigh_index = 0;

    let mut Xlow_left = 0.0;
    let mut Xhigh_left = 0.0;
    let mut Xlow_left_index = 0;
    let mut Xhigh_left_index = 0;

    let mut Xlow_right = 0.0;
    let mut Xhigh_right = 0.0;
    let mut Xlow_right_index = 0;
    let mut Xhigh_right_index = 0;

    for j in 0..Xall.len() {
        if Xall[j] >= 4278.0 && Xall[j - 1] < 4278.0 {
            Xlow_left = Xall[j];
            Xlow_left_index = j;
        }
        if Xall[j] > 4286.0 && Xall[j - 1] <= 4286.0 {
            Xhigh_left = Xall[j];
            Xhigh_left_index = j;
        }
    }
    for i in 0..Xall.len() {
        if Xall[i] >= 4302.0 && Xall[i - 1] < 4302.0 {
            Xlow = Xall[i];
            Xlow_index = i;
        }
        if Xall[i] > 4311.0 && Xall[i - 1] <= 4311.0 {
            Xhigh = Xall[i];
            Xhigh_index = i;
        }
    }
    for k in 0..Xall.len() {
        if Xall[k] >= 4319.0 && Xall[k - 1] < 4319.0 {
            Xlow_right = Xall[k];
            Xlow_right_index = k;
        }
        if Xall[k] > 4322.0 && Xall[k - 1] <= 4322.0 {
            Xhigh_right = Xall[k];
            Xhigh_right_index = k;
        }
    }

    let Xdata_left = Vec::from(&Xall[Xlow_left_index..=Xhigh_left_index]);
    let Ydata_left = Vec::from(&Yall[Xlow_left_index..=Xhigh_left_index]);
    let TotalInt_left = trapInt(Xdata_left, Ydata_left);
   
    let Xdata = Vec::from(&Xall[Xlow_index..=Xhigh_index]);
    let Ydata = Vec::from(&Yall[Xlow_index..=Xhigh_index]);
    let TotalInt = trapInt(Xdata, Ydata);
    
    let Xdata_right = Vec::from(&Xall[Xlow_right_index..=Xhigh_right_index]);
    let Ydata_right = Vec::from(&Yall[Xlow_right_index..=Xhigh_right_index]);
    let TotalInt_right = trapInt(Xdata_right, Ydata_right);
    let index = 2.0 * TotalInt / (TotalInt_left + TotalInt_right); 
    Ok((mjd, index))
}

Between the two approaches for error handling mentioned by @jameseb7 which one is faster when reading speed is bottleneck? Any suggestion on improving the speed?

If file reading speed is the bottle neck then I suspect error handling is not something you need to worry about.

Somehow you have to measure what is going on to find out where your problem is.

Performance tip: Your Vec::from calls are copying data into new vectors that you can easily avoid. Just rewrite trapInt so that it uses slices as inputs (reads data in place), then you can pass the slices directly instead of making new vectors. [Brief rule of thumb, a function like trapInt that is only reading data, should never take ownership, consume, its parameters, the vectors. It should take a reference like &[f64] in this case. Imagine the function description: I can only do this arithmetic if you also make me some memory to deallocate (??) :slightly_smiling_face: ]

I can only guess how long the vectors are, if they are short, then this doesn't matter at all compared with I/O overhead, if they are long, then it helps everything.

You're writing a lot of indexing code. Accessing elements by index is bounds checked in Rust (all compilation modes) and it's preferred for this reason (and other reasons) to write loops in terms of elements, not indexes, if possible. It's not that often this makes a measurable difference, but for numerical computation it's definitely something to watch.

Try to find a working set size so that you can run through the whole computation without using swap, hopefully you can see a big performance increase with that.

1 Like

I couldn't help but look at trapInt and think how I'd like to write it. It's a trapezoidal rule numerical integration function. If we write out the sum:

integral = Sum[from 0 to N-1]  dx * (y[i + 1] + y[i]) / 2

we can algebraically simplify that to:

integral = dx/2 Sum[from 0 to N-1]  y[i + 1] + y[i]
integral = dx/2 (Sum[from 0 to N-1]  y[i]  + Sum[from 1 to N]  y[i])

and simplify again to:

integral = 0
integral += y[0] / 2
integral += y[N] / 2
integral += Sum[from 1 to N-1]  y[i]
integral *= dx

and now we've isolated a simple summation loop over just (most of) the elements in the Y vector, which uses fewer operations already but is also easier for the compiler to optimize.

I've written that code here, as trapInt2. For simplicity, this code doesn't handle the case when length of Y is less than 2. The inline(never) is just a remnant from me looking at the compiler output briefly.

When looking at it, you see .sum::<f64>() and think you'd like that to be a vectorized, faster loop. So I dropped in ndarray's sum which does this - on borrowed data, very fluently(!), in trapInt3. Ndarray uses autovectorization, just writes the loop in a different way for better optimization. I'm sure there are other crates that can do comparable or better, and it's by no means a necessary crate.

3 Likes

You really want to avoid collecting them into a Vec first, and work on iterator directly. Try the code I've posted.

You should tune number of threads. My bet is that having a lot of threads here (10 * cpu count) is actually going to perform better. Nvmes are very fast and benefit from large amount of parallelism. But try different values and measure.

Ignore any other optimizations. IO is definitely dominant here.

1 Like

Yup.
It is an 8 core CPU and 16 threads.

slicing, instead of using vectors increased the speed a little.

yes indeed. I am trying that.