Rayon profiling parallelized pixel computations

Hello,

I have batched an image to block-chunks (index-wise) and try to parallelize a trivial operation but get a compiler error that I don't know how to work around.
The pixel data is described entirely within one single flat Vec.

When I run the identical sequential code I have no issues with it what so ever but when I add a rayon parallel for loop with disjunct index ranges (which should be thread safe) I get the following compile error:

buf[i] = modified;
^^^ cannot borrow as mutable

The function in question is here:

fn profile_parallel(num_samples: u32, buf_orig: &mut Vec<u8>, width: u32, height: u32, nc: u32, increment: u8) -> Vec<Duration> {
    let mut durations_vec: Vec<Duration> = Vec::new();

    let block_size: u32 = 64;
    let mut  num_width_blocks = width / block_size as u32;
    let rem_width_blocks = width % block_size as u32;
    let mut num_height_blocks = height / block_size as u32;
    let rem_height_blocks = height % block_size as u32;

    if rem_width_blocks > 0 {
        num_width_blocks += 1;
    }
    if rem_height_blocks > 0 {
        num_height_blocks += 1;
    }

    let coordinates: Vec<(u32, u32)> =
        (0..num_width_blocks)
        .flat_map(|x| (0..num_height_blocks)
        .map(move |y| (x, y)))
        .collect();

    for _i in 1..num_samples {
        // prepare for calculations
        let mut buf = buf_orig.clone();

        // start timer
        let t_start1 = Instant::now();

        coordinates.par_iter().for_each(|&(block_x, block_y)| {
            // Process each block
            let x0 = block_x * block_size;
            let y0 = block_y * block_size;
            let x1 = cmp::min(x0 + block_size, width);
            let y1 = cmp::min(y0 + block_size, height);
            //println!("Processing coordinate ({}, {} -> {}, {})", x0, y0, x1, y1);

            // calculate
            for y in y0 .. y1-1 {
                for x in x0 .. x1-1 {
                    let index = (y * width + x) * nc;
                    for j in 0 .. nc-2 {
                        let i : usize = (index + j) as usize;
                        let val : u8 = buf[i];
                        let modified : u8 = val.saturating_add(increment);
                        buf[i] = modified;
                    }
                }
            }

        });
    
        // store result
        let d_t1 = t_start1.elapsed();    
        durations_vec.push(d_t1);
    }

    durations_vec
}

What do I need to do to be allowed to perform the assignment buf[i] = modified?

Thanks for anything...

You can't, because rayon::ParallelIterator::for_each takes a type that implements Fn, not FnMut. Fn closures can't mutate state, whereas FnMut closures can.

I personally would probably just store AtomicU8s in your buffer to circumvent the restriction (atomic types have somewhat of the same ergonomics as interior mutability in the sense that you can mutate them without them needing to be declared as mutable):

use std::cmp;
use std::sync::atomic::{AtomicU8, Ordering};
use std::time::{Duration, Instant};

use rayon::prelude::*;

pub fn profile_parallel(
    num_samples: u32,
    buf: &[AtomicU8],
    width: u32,
    height: u32,
    nc: u32,
    increment: u8,
) -> Vec<Duration> {
    let mut durations_vec: Vec<Duration> = Vec::new();

    let block_size: u32 = 64;
    let mut num_width_blocks = width / block_size as u32;
    let rem_width_blocks = width % block_size as u32;
    let mut num_height_blocks = height / block_size as u32;
    let rem_height_blocks = height % block_size as u32;

    if rem_width_blocks > 0 {
        num_width_blocks += 1;
    }
    if rem_height_blocks > 0 {
        num_height_blocks += 1;
    }

    let coordinates: Vec<(u32, u32)> = (0..num_width_blocks)
        .flat_map(|x| (0..num_height_blocks).map(move |y| (x, y)))
        .collect();

    for _i in 1..num_samples {
        // start timer
        let t_start1 = Instant::now();

        coordinates.par_iter().for_each(|&(block_x, block_y)| {
            // Process each block
            let x0 = block_x * block_size;
            let y0 = block_y * block_size;
            let x1 = cmp::min(x0 + block_size, width);
            let y1 = cmp::min(y0 + block_size, height);
            //println!("Processing coordinate ({}, {} -> {}, {})", x0, y0, x1, y1);

            // calculate
            for y in y0..y1 - 1 {
                for x in x0..x1 - 1 {
                    let index = (y * width + x) * nc;
                    for j in 0..nc - 2 {
                        let i: usize = (index + j) as usize;
                        let val: u8 = buf[i].load(Ordering::Relaxed);
                        let modified: u8 = val.saturating_add(increment);
                        buf[i].store(modified, Ordering::Relaxed);
                    }
                }
            }
        });

        // store result
        let d_t1 = t_start1.elapsed();
        durations_vec.push(d_t1);
    }

    durations_vec
}

Playground.

(You might need to change the Ordering to something more strict to get the desired output)

1 Like

(It is interior mutability.)

2 Likes

Oh, indeed. Thanks for educating me once again! :slightly_smiling_face:

Thanks for the help @jofas

I completed my very first Rust program now which is to profile the parallell throughput of this selected image manipulation.
It is supposed to be an in-place operation, but the benefit is not as large as I expected. I guess it could be cache misses due to the flat layout of the image and that I actually work over a 2d kernel patch instead of either split it over consequtive pixels or patch up the image in smaller 64x64 patches before the operation is performed.
I have no clue what the cost of the AtomicU8 API will entail since I'm an absolute beginner in Rust.
The Relaxed ordering seems apropriate since I have no overlaps so the code should be able to run without any race conditions at all.

In any case I have 12 cores and it runs as follows.

Number of logical cores: 12
Performing performance analysis on parallel convolutional kernels (64x64)
Number of sample runs: 1000
Sequential time: 991.414µs, standard deviation:  266.411µs
Parallel  time: 386.222µs, standard deviation:  29.633µs

I guess I can check if it's mostly related to cache misses by working over 4096 pixels sequentially for each core instead.

EDIT (hopefully my last): I have now modified to account for better cache locality and it is the same as the original block-implementation. (I have tested to store the output image to make sure I didn't mess up).
So in this case it seems to me that there either is some overhead with the AtomicU8 solution, or the threads just messes up the cache anyway.

fn cp_atomic_u8(buf: & Vec<AtomicU8>) -> Vec<AtomicU8> {
    let mut res: Vec<AtomicU8> = Vec::with_capacity(buf.len());
    for x in buf {
        let v: u8 = x.load(Ordering::Relaxed);
        res.push(AtomicU8::new(v));
    }
    res
}

fn profile_parallel(num_samples: u32, buf_orig: &mut Vec<AtomicU8>, width: u32, height: u32, nc: u32, batch_size: u32, increment: u8) -> Vec<Duration> {
    let mut durations_vec: Vec<Duration> = Vec::new();

    let num_pixels = width * height;
    let mut num_pixel_chunks = num_pixels / batch_size as u32;
    let rem_pixel_chunks = num_pixels % batch_size as u32;
    if rem_pixel_chunks > 0 {
        num_pixel_chunks += 1;
    }


    let locations: Vec<(u32, u32)> = (0..num_pixel_chunks)
            .map(|x| (x * batch_size, cmp::min((x+1) * batch_size, num_pixels)))
            .collect();
    //println!("{:.?}", locations);

    for _i in 1..num_samples {
        // prepare for calculations
        let buf = cp_atomic_u8(buf_orig);

        // start timer
        let t_start1 = Instant::now();

        locations.par_iter().for_each(|&(x0, x1)| {
            // Process each block
            for x in x0 .. x1-1 {
                let index = x * nc;
                for j in 0 .. nc-2 {
                    let i : usize = (index + j) as usize;
                    let val : u8 = buf[i].load(Ordering::Relaxed);
                    let modified : u8 = val.saturating_add(increment);
                    buf[i].store(modified, Ordering::Relaxed);
                }
            }
        });
    
        // store result
        let d_t1 = t_start1.elapsed();    
        durations_vec.push(d_t1);
    }

    durations_vec
}

This resulted in:

Number of logical cores: 12
Performing performance analysis on parallel code with batch_size: 64
Number of samples: 1000
Sequential time: 939.761µs, standard deviation:  154.433µs
Parallel  time: 424.494µs, standard deviation:  40.33µs


Number of logical cores: 12
Performing performance analysis on parallel code with batch_size: 4096
Number of samples: 1000
Sequential time: 910.75µs, standard deviation:  85.168µs
Parallel  time: 393.208µs, standard deviation:  29.587µs

Thanks again :slight_smile:

Maybe I misinterpreting what you're measuring, but it seems to me that the "improved" version is 4x slower than the previous one.

Already changed... I edited it.

I had messed up the batch_size... it's better when I changed from 64 to 4096

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.