Can this be parallelized?

Hi,

Please see code below.
I would like to know if the for loops in the code can be parallelized. I believe the one with the rollsum can't but I might be wrong! The last one I am not sure. I am getting errors related to the variable vans when I try.

Side note, any comment to improve the code below would be appriciated.

Thank you for your help.

use rand::Rng;

fn make_chunk(len: usize, n: usize) -> Vec<(usize, usize)> {
    let m: usize = len / n;
    let mut chks = ((0..len).step_by(m).zip((0..len).skip(m-1).step_by(m))).collect::<Vec<_>>();
    if chks[n-1].1 != (len-1) {
        chks[n-1].1 = len -1;
    }
    chks
}

fn main() {
    let n: usize = 4;
    let len: usize = 50;
    
    let mut rng = rand::thread_rng();
    let range = rand::distributions::Uniform::new(0, 25);
    let x: Vec<u64> = (0..len).map(|_| rng.sample(range)).collect();
    
    let chks: Vec<(usize, usize)> = make_chunk(len, n);

    let mut counts: Vec<Vec<usize>> = vec![vec![0_usize; len]; n];// a matrix of dim len x n
    
    std::thread::scope(|s| {
        for (chk, count) in std::iter::zip(&chks, &mut counts) {
            s.spawn(|| {
                for i in (chk.0)..=(chk.1) {
                    count[x[i] as usize] += 1;
                }
            });
        }
    });
    let mut rollsum: usize = 0;
    for j in 0..len { // <- Can this be parallelized ?
        for i in 0..n {
            let tmp = counts[i][j];
            counts[i][j] = rollsum;
            rollsum += tmp;
        }
    }
    let mut vans: Vec<u64> = vec![0_u64; len];
    for i in 0..n { // <- Can this be parallelized ?
        for j in (chks[i].0)..=(chks[i].1) {
            let idx = x[j] as usize;
            vans[counts[i][idx]] = x[j];
            counts[i][idx] += 1;
        }
    }
   
    println!("x = {:?}\n", x); // Original vector
    println!("vans = {:?}", vans); // this should be sorted now!
}

what is the meaning of rollsum? It seems that, rollsum is accumulate in order.

What's more, vec.chunks(vec.len().div_ceil(N)) would divide a slice into N chunks, which might be what you want to do.

Yes you are right rollsum accumulate in order. I am basically doing a rolling sum on the vector of vector counts.
chks is already the chunks which should be processed in parallel and that I create using make_chunk.

Cumulative sum can be parallelized. Basically you do the following:

  1. Divide your array into N chunks.
  2. In parallel, compute the sum of each chunk. This yields an array of sums of length N.
  3. Compute the exclusive prefix sum of the array of length N.
  4. In parallel, compute the prefix sum of each chunk separately, starting at the corresponding value in the array of sums.

There are a ton of variations on this. It's a pretty fundamental parallel algorithm. You might find it helpful to look at how the parallel scan works in Intel TBB.

4 Likes

Thank you. I will search about this topic.
What about the last double for loop?

It looks like it can be parallelized but it will be very painful in Rust if you are used to the wild west of OpenMP. If you parallelize the outer for loop, each thread is writing to a separate region of vans if you did the prefix sum correctly. But you can't prove that to the compiler. You might be able to use split_at_mut to separate vans into the disjoint output ranges first, then hand them off to their threads.

1 Like

Yes with OpenMP it is easier to parallelise (but because the compiler does not do all the checks that Rust does to prevent you to shoot yourself in the foot) I am very new to Rust and even more to how to do multi-threading. I'll probably switch back to C if I fail to make it work. Thanks for your help and the hints.

You can always write unsafe code with raw pointers in Rust. As long as you don't violate the aliasing rules it's no worse than what you'd do in C / C++ / Cuda.

@rustboy16 Here is an example implementation of a parallel prefix sum. This version updates the data in place and I used a fixed partition size which you wouldn't for production code. I implemented this computation with OpenGL compute shaders for my thesis to compute the offsets for a list of dynamically sized lists (Vec<Vec<_>>).

fn main() {
    const PARTITION_SIZE: usize = 2;

    let mut data = vec![2, 6, 8, 5, 1, 9];

    let mut sums = (0..((data.len() + PARTITION_SIZE - 1) / PARTITION_SIZE))
        .map(|_| 0)
        .collect::<Vec<_>>();
    assert_eq!(sums, &[0, 0, 0]);

    // Partition the input and compute sums of each partition. Normally you would use a partition
    // count equal to the number of cores you can use in parallel.
    std::thread::scope(|scope| {
        data.chunks(PARTITION_SIZE)
            .zip(sums.iter_mut())
            .for_each(|(chunk, sum)| {
                scope.spawn(|| *sum = chunk.iter().sum());
            });
    });
    assert_eq!(sums, &[8, 13, 10]);

    // Compute the running sum of the sums of each partition.
    {
        let mut sum = 0;
        for item in sums.iter_mut() {
            let temp = *item;
            *item = sum;
            sum += temp;
        }
    }
    assert_eq!(sums, &[0, 8, 21]);

    // Compute the running sum of each partition usin the sums of each partition.
    std::thread::scope(|scope| {
        data.chunks_mut(PARTITION_SIZE)
            .zip(sums.iter().copied())
            .for_each(|(chunk, mut sum)| {
                scope.spawn(move || {
                    for item in chunk.iter_mut() {
                        sum += *item;
                        *item = sum;
                    }
                });
            });
    });
    assert_eq!(data, &[2, 8, 16, 21, 22, 31]);
}

The above uses mutation so that we do not have to collect scoped thread join handles and results. Thanks to @steffahn for helping me with this.

1 Like

If you aren't interested in return values of the threads (which are allowed to be interesting, non-() types as long as the type is Send), for the purpose of waiting for the threads to finish and to propagate panics that happened in any threads, there's nothing you need to do, as thread::scope will do that for you already. So you could just turn the map().collect() into for_each() and skip the loop with the join() calls entirely.

Edit: This of course only applies to the second part of your program, where the thread's return vakues are actually ignored.

1 Like

Right, that would be a possibility. You could then zip that with your other iterator

data
   .chunks(2)
   .zip(&mut sums)
   .for_each(|(chunk, s)| scope.spawn(|| -> i32 { *s = chunk.iter().sum() }))

and write the results in the threads that way.

Whether that approach is any nicer is probably a matter of taste ^^

1 Like

Here is a version that partitions according to the available parallellism.

fn ceil_div(value: usize, divider: usize) -> usize {
    (value + divider - 1) / divider
}

fn running_sum(values: &mut [i32], mut sum: i32) {
    for value in values.iter_mut() {
        sum += *value;
        *value = sum;
    }
}

fn running_sum_shifted(values: &mut [i32], mut sum: i32) {
    for value in values.iter_mut() {
        let temp = *value;
        *value = sum;
        sum += temp;
    }
}

fn main() {
    let mut data = vec![
        2, 6, 8, 5, 1, 9, 2, 6, 2, 6, 1, 4, 8, 9, 4, 4, 7, 2, 2, 7, 98, 57, 58, 3, 32, 58, 3, 38,
        54, 73, 7, 3, 34, 7, 3, 3, 74, 3, 45, 4, 4, 78, 7, 45, 45,
    ];

    // Single threaded version.
    let mut expected = data.clone();
    running_sum(&mut expected, 0);

    let thread_count = std::thread::available_parallelism()
        .map(std::num::NonZeroUsize::get)
        .unwrap_or(1);
    dbg!(thread_count);

    let partition_size = ceil_div(data.len(), thread_count);
    dbg!(partition_size);

    let mut sums = (0..thread_count).map(|_| 0).collect::<Vec<_>>();

    // Partition the input and compute sums of each partition.
    std::thread::scope(|scope| {
        data.chunks(partition_size)
            .zip(sums.iter_mut())
            .for_each(|(chunk, sum)| {
                scope.spawn(|| *sum = chunk.iter().sum());
            });
    });

    // Compute the running sum of the sums of each partition.
    running_sum_shifted(&mut sums, 0);

    // Compute the running sum of each partition usin the sums of each partition.
    std::thread::scope(|scope| {
        data.chunks_mut(partition_size)
            .zip(sums.iter().copied())
            .for_each(|(chunk, sum)| {
                scope.spawn(move || {
                    running_sum(chunk, sum);
                });
            });
    });

    assert_eq!(data, expected);
}
1 Like

Thanks, is there a place I can find such an example with multithreading?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.