Parallel execution of nested loop and collection of multiple occurences of values

Long title: Parallel execution of the combination (sum, subtraction, mult,...) of each element of two vectors / arrays with sizes n and m with each element of the element of the other to an output with size n x m and collection into a hashmap.

Hi Rust community!

I have here a system of nested loops where elements of vectors are combined and I want to execute that in parallel. Then I want to create some sort of histogram with a hashmap to get the occurrence of each combination by counting. I decided to use a hashmap because using the keys is practical to me.

A combination here is the absolute value of the difference of each possible pair (in the real case it will be triples and a number will be attached).

The whole code can be also found on the playground: Rust Playground

In v1 it is done in one step, in v2 the combinations are calculated first and then the occurrences therein are counted. From the aspect of functional programming and reusability the v2 appeals better to me, but v1 might need less memory.

    //=============================
    // alternative variant v1:
    let mut grouped = HashMap::new();

    for v1 in points1.iter() {
        for v2 in points2.iter() {
            let diff: i32 = v1 - v2;
            let norm = diff.abs();
            let counter = grouped.entry(norm).or_insert(0);
            *counter += 1;
        }
    }

    //=============================
    // variant v2:
    
    // preprartion of array that holds the combinations
    // an unninitialized vector -> array is used because each element will be replaced

    let n = points1.len();
    let m = points2.len();
    let size = n * m;
    let mut combinations: Vec<i32> = Vec::with_capacity(size);
    unsafe {
        combinations.set_len(size);
    }
    let mut combinations = Array1::from(combinations);


    //let mut combinations = Vec::new();
    for (i, v1) in points1.iter().enumerate() {
        for (j, v2) in points2.iter().enumerate() {
            let index = i * n + j;
            let diff: i32 = v1 - v2;
            let norm = diff.abs();
            combinations[index] = norm;
            //combinations.push(norm);
        }
    }

    let mut grouped = HashMap::new();

    combinations.iter().for_each(|comb| {
        let counter = grouped.entry(&*comb).or_insert(0);
        *counter += 1;
    });

In the following part I atttempt to parallelize the loop to get the combinations from v2 by a) changing the elements by index of a predefined array or b) by collecting into a new vector.
For a) I could use a mutex to be able to borrow the array mulably (but the benchmark test of the optimized version below shows that this does worse than serial execution). Futhermore one needs to get the data back from the mutex.
In b) I have a problem with collecting into a Vec, giving an error.

    //=============================
    a)

    points1.into_par_iter().enumerate().for_each(|(j, v1)| {
        points2.par_iter().enumerate().for_each(|(i, v2)| {
            let index = j * n + i;
            //println!("{:?}", index);
            let diff: i32 = v1 - v2;
            let norm = diff.abs();
            let mut arr = combinations.lock().unwrap();
            arr[index] = norm;
            //combinations[index] = norm;
        })
    });

    // make mutex useable
    let combinations = f(combinations);
 
    //=============================
    b)

    let test: Vec<i32> = points1
        .par_iter()
        .map(|v1| {
            points2
                .par_iter()
                .map(|v2| {
                    let diff: i32 = v1 - v2;
                    let norm = diff.abs();
                    norm
                })
                // ----------------> "sum" is added here to make the code compilable <---------------
                //.sum()
        })
        .collect();
error[E0277]: the trait bound `Vec<i32>: FromParallelIterator<rayon::iter::Map<rayon::slice::Iter<'_, i32>, [closure@src/main.rs:162:22: 166:18]>>` is not satisfied
   --> src/main.rs:170:10
    |
170 |         .collect();
    |          ^^^^^^^ the trait `FromParallelIterator<rayon::iter::Map<rayon::slice::Iter<'_, i32>, [closure@src/main.rs:162:22: 166:18]>>` is not implemented for `Vec<i32>`
    |
    = help: the following implementations were found:
              <Vec<T> as FromParallelIterator<T>>

How to resolve it?

I think parallelizing the counting of occurences with the hashmap is a task that might be difficult but the benchmarks show that it is a crucial step.

combinations.iter().for_each(|comb| {
    let counter = grouped.entry(&*comb).or_insert(0);
    *counter += 1;
});

What might be a better way? To sort the array beforehand and to add the keys only then, but without checking if an element exists (e.g. by means of an manual index)? Or using an existing crate and transform to a hashmap then?

But. for example in my case, for "already" <100_100 elements (40_000_000_000_000 bytes), my computer runs out of memory and the execution fails. So direct collection seems attractive.

One snapshot of the execution times:

creating points
points created: 56.292ยตs

START direct collection v1
direct collection: 1.532850622s

START combination seperate v2
combination seperate: 140.865953ms
START grouping v2
grouped seperate: 1.580918659s

START alternative a)
alternative a): 5.819199738s

I hope this place is suited to discuss this problem.

Opinions and help for improvement are very welcome. Also if there could be a relevantly faster design in general. Thanks!

You want to use flat_map instead of map here โ€“ this will allow you to have a "one-layer" iterator over norms and you no longer would need a sum() to flatten it. (There are actually two methods โ€“ flat_map and flat_map_iter. I'd use the second one, as the outer iterator is "parallel enough").

So I guess approach v2 with collecting to Vec, then processing is not really viable :upside_down_face:. I also find v1 to be more readable, but I guess that's just an opinion and the "functional appeal" of v2 may have some adventages. Anyway, you don't need to collect to have this "functional feel". Check out:

let diffs = points1.iter().flat_map(|v1| {
    points2.iter().map(move |v2| (v1 - v2).abs())
});
let mut grouped = HashMap::new();
diffs.for_each(โ€ฆ)

And actually, we can get a step further, and translate the last for_each into yet another iterator operation โ€“ fold:

let grouped = diffs.fold(HashMap::new(), |mut grouped, diff| {
    *grouped.entry(diff).or_insert(0) += 1;
    grouped
});

Note: This might be less readable than the foreach approach, but a general tip with working with rayon is to first translate everything into iterator chain (without for_each preferable) instead of operating on mutable structures. Then translating into rayon should be "just add par_iter()".

So now we need to translate this fold into rayon. Rayon has a fold operation, but it returns an iterator, which might be surprising. Fortunately, in the docs there's a nice description of fold+reduce pattern with en example. Check it out.

So, the final code could look something like:

let grouped = points1.par_iter()
    .flat_map_iter(|v1| {
        points2.iter().map(move |v2| (v1 - v2).abs())
    })
    .fold(|| HashMap::new(), |mut grouped, diff| {
        *grouped.entry(diff).or_insert(0) += 1;
        grouped
    })
    .reduce(|| HashMap::new(), |grouped1, grouped2| {
         /* combine two hashmaps */
    });

from my understanding this shouldn't take too much memory, as the number of hashmaps created should be roughly proportional to the number of CPUs you have, if the docs don't lie.


disclaimer: all code snippets written by-hand, not tested anything edit: fixed the code snipets, they should compile (btw, a repo or full code with easy way to repro the numbers would be helpful, sorry, I missed the link to the playground, my bad!)

2 Likes

Dear krdln!
I thank you for your answer. I will implement it asap and write again.

-> In my post I have this line:

Is it something different you mean that I should post? I backchecked and (at least now) the link is accessible. I can post an updated link if I get the thing running. :+1:

The compiler tells me that I must change

by inserting "move" to

let diffs = points1.iter().flat_map(|v1| {
    points2.iter().map(move |v2| (v1 - v2).abs())
});

Doing so, I could collect the vector as Vec<i32>.
I am working on the fold/reduce part.

The error was:

   Compiling playground v0.0.1 (/playground)
error[E0373]: closure may outlive the current function, but it borrows `v1`, which is owned by the current function
   --> src/main.rs:153:25
    |
153 |     points2.iter().map( |v2| (v1 - v2).abs())
    |                         ^^^^  -- `v1` is borrowed here
    |                         |
    |                         may outlive borrowed value `v1`
    |
note: closure is returned here
   --> src/main.rs:153:5
    |
153 |     points2.iter().map( |v2| (v1 - v2).abs())
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
help: to force the closure to take ownership of `v1` (and any other referenced variables), use the `move` keyword
    |
153 |     points2.iter().map( move |v2| (v1 - v2).abs())
    |                         ^^^^^^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0373`.
error: could not compile `playground`

To learn more, run the command again with --verbose.

Sorry, I haven't noticed the playground link. I probably focused too much on the code snippets and thus missed the link :upside_down_face:

Yup, that's correct, as I said I haven't really verified the snippets :slight_smile: . Very nice that the compiler suggests a correct fix there! Sidenote: I remember encountering similar problem with closure in flat_map, but don't remember this suggestion with move. Looks like this suggestion was introduced in 1.36.

Anyway, I've fixed the code snippets so the code snippets should compile now.

Btw, I looked at the playground link and I think the numbers you have are from debug mode. When I switched to release, I got numbers on the order of magnitude of ms, not s. I've also tried running this playground, but I guess you don't have access to multiple CPU cores on playgrounds, so I tried running it locally. The parallel versions is faster, but just barely.. I guess the most costly thing is joining the maps. Tried the BTreeMap (as the data is sorted (so in theory BTree map has better cache locality), but it's not faster. If you know somehow that max diff is bounded, perhaps you can try using Vec for storing the histogram (with norms as its indices)?

1 Like

I couldn't resist and I've experimented a little bit more... And I've managed to make parallel version significantly faster! Here are two tweaks I've used:

with_min_len

     let grouped = points1
         .par_iter()
+        .with_min_len(20)
         .flat_map_iter(|v1| points2.iter().map(move |v2| (v1 - v2).abs()))

this tells rayon to avoid parallelizing chunks smaller than particular len. This option is especially useful, if we know that the workload is distributed evenly between elements (and we know that, as we're basically doing the same thing in each iteration). I tried different values, and got reasonable results between 10 to 125 (num of elems / num threads) (126 made it worse :smiley:). But I guess it's best to keep this value to "smallest one that still gives good speed" to avoid pessimizing in case you run program with different # of threads.

Another tweak โ€“ always iterate over smaller map and insert into bigger one:

             |mut grouped1, mut grouped2| {
+                if grouped2.len() > grouped1.len() {
+                    std::mem::swap(&mut grouped1, &mut grouped2);
+                }
                 for (k, v) in grouped2 {
                     *grouped1.entry(k).or_insert(0) += v;
                 }

This is especially helpful, as rayon sometimes calls the function with grouped1=empty.

Now I'm getting 43ms for serial version and 9ms for parallel (4-core + HT).

2 Likes

Hi.

Thanks, you are great, thank you. I'll implement it today.

What do you mean with bounded? That there's an upper limit?

In some sense there's always an upper limit but there's no general upper limit. What I can imagine is to to take the maximum value and the minimum value and the difference will be the largest distance.
Since I plan to do it with 3D coordinates, this leads to another interesting problem namely that we would need to find the value of the most distant distance in this system. I've looked around and the people doing something by using e.g. convex hulls etc

https://web.archive.org/web/20180528104521/http://www.tcs.fudan.edu.cn/~rudolf/Courses/Algorithms/Alg_ss_07w/Webprojects/Qinbo_diameter/2d_alg.htm

What I do not know is how to correlate the index with the value of the key (there could be many empty ones). I could imagine working with two vectors or a vector of tuples (and maybe sorting after each new insertion), but I don't know if it would be faster. In a general setting, I could imagine that one could calculate all differences hence obtain then a histogram but with using iterators I think that this will be not possible (without many adjustments).

I think I worded myself poorly, sorry for that. What I've meant was "small enough upper limit so that the diffs will be dense".

I meant that you can just use index=key. Of course, if your data is not dense or if there are some high outliers, it will eat your memory :slight_smile: And this approach will not work for floating points, ofc.

But from you last post, now I understand more what's the high-level idea, so I guess assumption about density goes out of the window. I was also thinking about some faster algo (sub-quadratic) for creating histogram, but since you're saying you want to do it in 3D, I have no idea. I guess there might be some faster solutions for approximate histogram though?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.