How to parallelize this code?

Hello everybody,

I have the following sequential code and I need to parallelize it. As it accesses different indexes I can't find the way to do it. Can you think of an alternative ?

Here I made a simplified example code. I need to parallelize the first for.

Thank you !!

1 Like

Hey there, this sounds like a nifty little puzzle to solve so lets try to tackle it real quick. :slight_smile:

OK, so first think I tried is to get rayon in here because it is kind of the easiest way out there to parallelize iterators, I think. So after including Rayon we and adding the into_par_iter() calls we now have:

use rayon::prelude::*;

type Datatype = f32;
type Vector = [Datatype; N];

const N: usize = 1024;
const BLOCK_SIZE: usize = 16;
const DT: Datatype = 1_f32;

fn get_vector() -> Box<Vector> {
    let matrix = unsafe {
        let layout = std::alloc::Layout::new::<Vector>();
        let ptr = std::alloc::alloc_zeroed(layout) as *mut Vector;
        Box::from_raw(ptr)
    };

    matrix
}

fn main() {
    let mut xpos = get_vector();
    let mut ypos = get_vector();
    let mut zpos = get_vector();
    let mut forcesx = get_vector();
    let mut forcesy = get_vector();
    let mut forcesz = get_vector();

    (0..N).into_par_iter().step_by(BLOCK_SIZE).for_each(|ii| { //I need to parallelize this loop
        (0..N).into_par_iter().for_each(|j| {
            (ii..ii + BLOCK_SIZE).for_each(|i| {
                let dx = xpos[j] - xpos[i];
                let dy = ypos[j] - ypos[i];
                let dz = zpos[j] - zpos[i];

                forcesx[i - ii] += 5_f32;
                forcesy[i - ii] += 5_f32;
                forcesz[i - ii] += 5_f32;
            });
        });
    });
}

But now we run into some compiler errors:

error[E0596]: cannot borrow `forcesx` as mutable, as it is a captured variable in a `Fn` closure
  --> src/main.rs:30:44
   |
30 |             (ii..ii + BLOCK_SIZE).for_each(|i| {
   |                                            ^^^ cannot borrow as mutable
...
35 |                 forcesx[i - ii] += 5_f32;
   |                 ------- mutable borrow occurs due to use of `forcesx` in closure

What it's telling us here is that we can't have a mutable reference to forcesx in all of the different parallel iterations at the same time, because that would mean multiple threads mutating the same data at the same time which can cause data races. Beautiful Rust and Rayon, it won't let us cause data races.

So we have to provide a way to have only one mutable reference to our forces. A good way ( I think ) to do this is with channels. We spawn one thread that we give ownership of our forces, then in our parallel loops that are doing the computations, we send that thread that owns our forces commands that mutate the forces. So the forces will be mutated sequentially ( by necessity to prevent data races and undefined behavior ), but the data is computed in parallel.

We're going to use crossbeam channels because they are popular fast, and well featured, but you could probably use other channels as well. This takes a little bit more work, but it's not too bad:

use rayon::prelude::*;
use crossbeam_channel::{bounded, unbounded};

type Datatype = f32;
type Vector = [Datatype; N];

const N: usize = 1024;
const BLOCK_SIZE: usize = 16;
const DT: Datatype = 1_f32;

fn get_vector() -> Box<Vector> {
    let matrix = unsafe {
        let layout = std::alloc::Layout::new::<Vector>();
        let ptr = std::alloc::alloc_zeroed(layout) as *mut Vector;
        Box::from_raw(ptr)
    };

    matrix
}

#[derive(Debug)]
enum ForceCommand {
    AddX(usize, f32),
    AddY(usize, f32),
    AddZ(usize, f32),
}

fn main() {
    let mut xpos = get_vector();
    let mut ypos = get_vector();
    let mut zpos = get_vector();
    let mut forcesx = get_vector();
    let mut forcesy = get_vector();
    let mut forcesz = get_vector();
    
    // Create channel for sending force mutations
    let (cmd_sender, cmd_receiver) = unbounded();
    
    // Create a channel for getting the resultant forces
    let (res_sender, res_receiver) = bounded(1);
    
    // Spawn a thread for mutating the forces
    std::thread::spawn(move || {
        for message in cmd_receiver {
            match message {
                ForceCommand::AddX(index, value) => forcesx[index] += value,
                ForceCommand::AddY(index, value) => forcesx[index] += value,
                ForceCommand::AddZ(index, value) => forcesx[index] += value,
            }
        } 
        res_sender.send((forcesx, forcesy, forcesz)).unwrap();
    });

    (0..N).into_par_iter().step_by(BLOCK_SIZE).for_each(|ii| {
        (0..N).into_par_iter().for_each(|j| {
            (ii..ii + BLOCK_SIZE).for_each(|i| {
                // Clone the sender so each iteration gets its own
                let cmd_sender = cmd_sender.clone();
                let dx = xpos[j] - xpos[i];
                let dy = ypos[j] - ypos[i];
                let dz = zpos[j] - zpos[i];

                cmd_sender.send(ForceCommand::AddX(i - ii, 5_f32)).unwrap();
                cmd_sender.send(ForceCommand::AddY(i - ii, 5_f32)).unwrap();
                cmd_sender.send(ForceCommand::AddZ(i - ii, 5_f32)).unwrap();
            });
        });
    });
    
    // Drop the sender: this makes sure that the thread that is waiting for
    // force mutation commands will stop looping when it finds out that all
    // senders have been droped and that there is no more mutation commands
    // to wait for.
    drop(cmd_sender);
    
    let (forcesx, forcesy, forcesz) = res_receiver.recv().unwrap();
}

Now we've got it parallel! Take note, that I'm not sure if all of the parallelism will actually make this faster. You will have to test it in your application. For instance, I put two parallel iterators in that example, but it might be more efficient to do just one of the iterators parallel so there is less context switching.

Hope this helps!

1 Like

Hello !

First of all thank you very much for the time and dedication in your message.

I tried your idea but the problem is that it takes much longer than a common sequential version. I think this channel thing is generating a lot of overhead.

It would be interesting to think of a solution with iterators. Once we have that version, we could think about using Rayon. But I can't think of how to translate this into sequential iterators.

Here I update the code

In short, the problem is to be able to parallel this in the most efficient way:

(0..N).for_each(|i| {
  (0..N).for_each(|j| {
    (0..N).for_each(|k| {
        vector[k] += k
    });
 }); 
});

Looking at your original code (in particular forcesx[i - ii] += 5_f32) I think you'll have issues parallelising it as currently formulated.

From what I can see, youe problem is that each thread will scan through the i's and ii's and mutate the i - ii'th force component. You'll probably end up with multiple threads all stepping on each others toes when updating forcesx[i - ii].

Can you rephrase this to create a new copy of the matrix (e.g. with a .collect())? That's what you often need to do when writing GPU code because parallelism and mutation don't tend to mix well.

1 Like

How fast is the serial code and does it generate vector (SIMD) instructions? If the code is too fast, multi-threading will work against you.

1 Like

Yeah, that is the core problem here. The only way ( that I know ) to mutate your vector in parallel is either to use shared, locked memory or channels. We've already tried channels and shared locked memory is going to be really bad in this case because all of your parallel threads would just end up waiting for each other for the lock to release on the vector you are mutating.

It may well be possible that you aren't actually going to save work by multi-threading here because the cost of sending values over threads is too high like @Phlopsi said. SIMD would be something to look into here.

The last thing I can think of would be to take slices of your vectors and loop through a different section of them in each thread so that there is absolutely no overlap between what the threads are working on, then you combine the diffs after they all finish:

type Datatype = f32;
type Vector = [Datatype; N];

const N: usize = 1024;
const BLOCK_SIZE: usize = 16;
const DT: Datatype = 1_f32;

fn get_vector() -> Box<Vector> {
    let matrix = unsafe {
        let layout = std::alloc::Layout::new::<Vector>();
        let ptr = std::alloc::alloc_zeroed(layout) as *mut Vector;
        Box::from_raw(ptr)
    };

    matrix
}

const THREADS: usize = 4;

fn main() {
    let mut xpos = get_vector();
    let mut ypos = get_vector();
    let mut zpos = get_vector();
    let mut forcesx = get_vector();
    let mut forcesy = get_vector();
    let mut forcesz = get_vector();

    let (sender, receiver) = crossbeam::channel::unbounded();

    // Create a scope of threads that can take references to our pos vectors
    crossbeam::scope(|scope| {
        // Loop through the number of worker threads we want
        for thread_num in 0..THREADS {
            // Create a diff vectors to store our mutations in for this thread
            let mut xdiffs = get_vector();
            let mut ydiffs = get_vector();
            let mut zdiffs = get_vector();
            // Get references to our positions
            let xpos = &xpos;
            let ypos = &ypos;
            let zpos = &zpos;
            // Clone the sender
            let sender = sender.clone();
            
            // Spawn a new thread to do the work
            scope.spawn(move |_| {
                let thread_num = thread_num.clone();
                // Operate on only a slice of the indexes that are specific to
                // this thread.
                (thread_num..(N / THREADS + thread_num))
                    .step_by(BLOCK_SIZE)
                    .for_each(|ii| {
                        (thread_num..(N / THREADS + thread_num)).for_each(|j| {
                            (ii..ii + BLOCK_SIZE)
                                .zip(&mut xdiffs[..])
                                .zip(&mut ydiffs[..])
                                .zip(&mut zdiffs[..])
                                .for_each(|(((i, xdiff), ydiff), zdiff)| {
                                    // Dor our operations
                                    let dx = xpos[j] - xpos[i];
                                    let dy = ypos[j] - ypos[i];
                                    let dz = zpos[j] - zpos[i];

                                    // Mutate the thread-local diff instead of
                                    // global forces. This is safe and non-locked
                                    *xdiff += dx;
                                    *ydiff += dy;
                                    *zdiff += dz;
                                });
                        });
                    });
                    
                // Send all the diffs calculated by this thread
                sender.send((xdiffs, ydiffs, zdiffs)).unwrap();
            });
        }
    });

    drop(sender);

    // grab the diffs from all of the worker threads and apply them.
    // Note: there will only be 1 message sent over the thread per worker, so
    // you'll only get fore channel sends and receives like this, which means
    // the overhead is minimal.
    receiver.iter().for_each(|(xdiff, ydiff, zdiff)| {
        forcesx
            .iter_mut()
            .zip(xdiff.iter())
            .for_each(|(x, diff)| *x += diff);
        forcesy
            .iter_mut()
            .zip(ydiff.iter())
            .for_each(|(y, diff)| *y += diff);
        forcesx
            .iter_mut()
            .zip(zdiff.iter())
            .for_each(|(z, diff)| *z += diff);
    });
}

Ideally, the mutable part should be the outer loop driving parallelism, so its mutation can be split to independent work. Something like:

forcesx.par_iter_mut()
    .zip(&mut forcesy)
    .zip(&mut forcesz)
    .for_each(|((x, y), z)| {
        //...
    });

You'll need to adjust the remaining immutable iterators to do a calculation in terms of a given element, instead of updating multiple locations.

3 Likes

Parallelization is an act of exploiting associativity (x + (y + z) = (x + y) + z). You are doing a summation of f32s. The floating point number addition isn't associative unlike real number addition, but approximately is. Assuming it is allowed to change the result slightly, the summation can be decomposed to smaller summations in a way to allow parallelization. Just using channels or locks won't work, as the actual work is serialized.

That being said, I didn't understand your code yet but it looks like the quadratic loop isn't even necessary (assuming floating point addition associativity approximation). I'll use the mathematical notation to represent the operation, and use algebraic laws to simplify the operation.

Edit: I noticed your code is "simplified one" so it isn't the exact code you want to optimize right? Your "updated" version doesn't compile. I cannot give analysis because I cannot be sure what is the exact operation you want to compute.

Thank you all for responding. It helped me a lot.

I took some time to analyze what you raised and modify my algorithm. Here I leave the full algorithm.

Can you think of optimizations ? I am afraid that this is not the best version that can be done.

Thanks again !!!.

1 Like

It looks like you're doing velocity Verlet to integrate an n-body gravitational system?

Yes sir, that's right.

At first glance, I'd switch from using a powf to a sqrt plus a powi. The former is basically the cost of a logarithm plus an exponential. It's unlikely that the compiler will notice that your 1.5 power could be computed more efficiently. Of course this makes the code harder to parallelize effectively by making it faster.

I'd also make sure your serial code uses Newton's third law to avoid computing forces twice as that could give you and additional factor of two speed up.

I'd prefer to use normal Verlet, where you just store three copies of the position, which can be permuted around to avoid allocations. In normal Verlet you would only need one loop, which should improve things a bit. You also need just one struct, which is position.

In fact, even with your algorithm there is no need to store all the forces, you can combine loops and just use each force right after you compute it.

I'd also absolutely cut all the unsafe code. It doesn't seem likely to help at all.

1 Like

Thank you !

I did not understand how you propose to replace this dsquared.powf(3.0 / 2.0). Could you explain to me ?

dsquared.sqrt().powi(3)

Wouldn't dsquared.sqrt() * dsquared be even faster?

1 Like

I have my updated code.

What I do is calculate the forces before processing the bodies. I also stopped processing by blocks.

I get better results, but I would like to know what you think ? One thing I don't understand is why it goes much slower if I remove the thread limit so that rayon can decide how many to use (on my server it works well with 32). Is there any explanation ?

Thanks again !

What is very time consuming is the calculation of the forces. I have to do N^2 iterations, but I don't know if it's the best way how I do it (a map and inside a fold)

If N^2 is the problem, you can use Barnes-Hut.

1 Like