Optimize array sums

You can use iterators with for loop, you just avoid indexes. Since you are working with chunks of BLOCK_SIZE elements, you should take a look at the chunks method of the slice (windows method is also usefull when you are dealing with chunks of elements).

I have swapped the for k in and for i in loop, to keep the same chunk of ij elements for each iteration of the most outter loop. Then it is possible to rewrite it like this:

   use std::ops::Add;
   use std::cmp::Ord;
   use std::default::Default;

    pub fn array_sums_<T: Default+Copy+Add<Output=T>+Ord>(block_size: usize, ik: &[T], kj: &[T], ij: &mut[T]) {
        if block_size == 0 {
            return ();
        }
        if(ik.len() != block_size*block_size) || (ik.len() != kj.len()) || (ik.len() != ij.len()) {
            return ();
        }
        for (ij, ik) in ij.chunks_exact_mut(block_size).zip(ik.chunks_exact(block_size)) {
            for (&ik,kj) in ik.iter().zip(kj.chunks_exact(block_size)) {
                for (ij,&kj) in ij.iter_mut().zip(kj.iter()) {
                 *ij = std::cmp::min(*ij, ik+kj);
                }
            }
        }
    }

The first loop for (ij, ik) in ij.chunks_exact_mut(block_size).zip(ik.chunks_exact(block_size)) divides ij and ik into chunks of the same size and zip those chunks together (the first chunk of ij will be associated with the first chunk of ik, then the second chunk of ij will be associated with the second chunk of ik, etc.). It will generate an iterator of block_size (because the len of the slices are the squared of block_size) pairs of chunks.

Example

For instance if BLOCK_SIZE is equal to 4 and we have the following arrays the first loop will generate the following iterator:

Input:

ij 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
ik 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
kj 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
Iterator:
The first loop will iterate of an iterator of 4 pairs of chunks (see below). The elements of the pair are the chunks not the values inside. The whole ik row is an element and the other one is the ij row.
--- --- --- --- ---
ik 1 2 3 4
ij 1 2 3 4
ik 5 6 7 8
ij 5 6 7 8
ik 9 10 11 12
ij 9 10 11 12
ik 13 14 15 16
ij 13 14 15 16

The second loop for (&ik,kj) in ik.iter().zip(kj.chunks_exact(block_size)) iters over all the elements of the current chunk of ik and zip each of them with a chunk of kj.

Example

For this loop we will only consider the first element of the iterator generated by the most outter loop.
Input:

ij 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
ik 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
kj 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
First element of the outter iterator:
--- --- --- --- ---
ik 1 2 3 4
ij 1 2 3 4

For this one we will associate to each value of ik a chunk of kj (see below). In this case the elements of the iterator are pairs of one value (ik) and chunk (ij). For next elements of the outter iterator the value of ik would have been (5,6,7,8 | 9,10,11,12 | 13,14,15,16). The kj are always the same for this one.

ik 1
kj 1 2 3 4
ik 2
kj 5 6 7 8
ik 3
kj 9 10 11 12
ik 4
kj 13 14 15 16

The last loop zip all elements of ijand kj and update the ij with the current value of ik.

Example

For this loop we will only consider the first element of the iterator generated by the most outter loop.
Input:

ij 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
ik 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
kj 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
First element of the outter iterator:
--- --- --- --- ---
ik 1 2 3 4
ij 1 2 3 4

Second iterator:

ik 1
kj 1 2 3 4
ik 2
kj 5 6 7 8
ik 3
kj 9 10 11 12
ik 4
kj 13 14 15 16

Last iterators:
For ik=1: zip(ij,kj)=[(1,1),(2,2),(3,3),(4,4)]
For ik=2: zip(ij,kj)=[(1,5),(2,6),(3,7),(4,8)]
For ik=3: zip(ij,kj)=[(1,9),(2,10),(3,11),(4,12)]
For ik=4: zip(ij,kj)=[(1,13),(2,14),(3,15),(4,16)]

So ij(1) = min(ij(1), ik(1)+kj(1), ik(2)+kj(5), ik(3)+kj(9), ik(4)+kj(13))
So ij(2) = min(ij(2), ik(1)+kj(2), ik(2)+kj(6), ik(3)+kj(10), ik(4)+kj(14))
So ij(3) = min(ij(3), ik(1)+kj(3), ik(2)+kj(7), ik(3)+kj(11), ik(4)+kj(15))
So ij(4) = min(ij(4), ik(1)+kj(4), ik(2)+kj(8), ik(3)+kj(12), ik(4)+kj(16))

3 Likes

This answer is incredible !

I have only one problem, I'm using f32, is there any way to adapt Ord to work with f32 ?

That way I can test this algorithm and comment on how it worked.

Thank you very much !

First I want to say that you are amazing. Your version is extremely fast, I can't believe it.

To make it work with f32, I had to switch to the following:

pub fn run(ij: &mut[f32], ik: &[f32], kj: &[f32], path: &mut[i32]) {
for (k, ((ij, path), ik)) in ij.chunks_exact_mut(BLOCK_SIZE).zip(path.chunks_exact_mut(BLOCK_SIZE)).zip(ik.chunks_exact(BLOCK_SIZE)).enumerate() {
    for (&ik,kj) in ik.iter().zip(kj.chunks_exact(BLOCK_SIZE)) {
        for ((ij, path),&kj) in ij.iter_mut().zip(path.iter_mut()).zip(kj.iter()) {
            let sum = ik+kj;
            *ij  = ij.min(sum);
            if (*ij).eq(&sum) {
                *path = k as i32;
            }
         
        }
    }
}

}

As you will see, I added a new arrangement (path). The idea is to store the k-iteration as long as I have updated ij. Do you think it can be done in a better way ? Thanks a lot.

If I'm reading your code correctly, your operation is called Min-plus matrix multiplication. Unlike normal matrix multiplication, there is no known truly subcubic time algorithm for this problem.

Because the memory access pattern is the same as usual matrix multiplication, all usual optimization techniques (such as strip mining) should apply, but details depend on your BLOCK_SIZE. Probably it is not a bad idea to convert existing tuned matrix multiplication code.

About path reconstruction. If the compiler doesn't convert your branch to a branchless operation, auto-vectorization will be blocked and the performance will drop significantly. It seems like auto-vectorization applies in this case as long as appropriate target-cpu is specified https://rust.godbolt.org/z/3GcxM4, but you probably want to check your assembly output to make sure auto-vectorization is applied.

Yes !

I didn't know the algorithm was called that.

Exactly it should do that, but not with a reduction as it is the case of the multiplication of arrays. At the moment, the @Vincent_0 algorithm modified to work with f32 works very well. Can you think of improvements ? I think that the path assignment I am doing can be improved

It seems that the k should be on the second loop and not the first loop which iterates over the i. In fact the index corresponds to the common letter of the arrays (for ij and ik it is i ; for ik and kj it is k, etc.).

I am not sure it is easy to improve tour code with the path. You can try to rewrite the conditional statement but I am not convinced it will improve the performance. Here is a way to rewrite the condition (maybe it is faster, maybe it is slower, maybe it is the same):

const BLOCK_SIZE: usize = 256;
pub fn run(ij: &mut[f32], ik: &[f32], kj: &[f32], path: &mut[u32]) {
    if(ik.len() != BLOCK_SIZE*BLOCK_SIZE) || (ik.len() != kj.len()) || (ik.len() != ij.len()) || (ik.len() != path.len()) {
        return ();
    }
    for ((ij, path), ik) in ij.chunks_exact_mut(BLOCK_SIZE).zip(path.chunks_exact_mut(BLOCK_SIZE)).zip(ik.chunks_exact(BLOCK_SIZE)) {
        for (k,(&ik,kj)) in ik.iter().zip(kj.chunks_exact(BLOCK_SIZE)).zip(0u32..).map(|(x,y)| (y,x)) {
            for ((ij,path),&kj) in ij.iter_mut().zip(path.iter_mut()).zip(kj.iter()) {
                let sum = ik+kj;
                *ij  = ij.min(sum);
                let update_path = if (*ij).eq(&sum) { 1u32 } else {0u32 };
                // *path will be equal to *path if update_path is equal to 0
                // and k if update_path is equal to 1.
                *path = (1u32 - update_path ) * *path + update_path * k
            }
        }
    }
}
         

.zip(0u32..).map(|(x,y)| (y,x)) is equivalent to enumerate(), but with u32 instead of usize.

I tried your code but it works better with the previous one. The new solution is minimally worse than the other one. I don't think that it is necessary to deeper in the path because it doesn't worsen too much the performance of the algorithm.

Thanks for everything !

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.