Another multi-threading for loop

Hi,

I am trying again to parallelise the below code, which is giving some headache.

The loop to parallelise is in the function extremas.
Basically I am trying to build a function that find the min and max in a vector in parallel.

Thank you in advance.

use rand::Rng;

fn make_chunk(len: usize, n: usize) -> Vec<(usize, usize)> {
    let m: usize = len / n;
    let mut chks = ((0..len).step_by(m).zip((0..len).skip(m-1).step_by(m))).collect::<Vec<_>>();
    if chks[n-1].1 != (len-1) {
        chks[n-1].1 = len -1;
    }
    chks
}

fn min_max(x: &[u64], y: &[u64]) -> (u64, u64) {
    let mut a = x[0];
    let mut b = y[0];
    for i in 1..x.len() {
        a = if a > x[i] { x[i] } else { a };
        b = if b < y[i] { y[i] } else { b };
    }
    (a, b)
}

fn extremas(x: &[u64], chunks: &Vec<(usize, usize)>, k: usize) -> (u64, u64) {
    let mut minr = vec![0_u64; k];
    let mut maxr = vec![0_u64; k];
    for i in 0..k {//       <- Loop to  do in parallel
        let mut a = x[0];
        let mut b = x[0];
        for j in (chunks[i].0)..=(chunks[i].1) {
            if x[j] > b {
                b = x[j];
            } else if x[j] < a {
                a = x[j];
            }
        }
        minr[i] = a;
        maxr[i] = b;
    }
    min_max(&minr, &maxr)
}

fn main() {
    let n: usize = 4; // number of threads
    let len: usize = 9;
    
    let mut rng = rand::thread_rng();
    let range = rand::distributions::Uniform::new(0, 5);
    let x: Vec<u64> = (0..len).map(|_| rng.sample(range)).collect();
    
    let chks: Vec<(usize, usize)> = make_chunk(len, n);
   
    println!("x = {:?}", x);
    println!("(min, max) = {:?}", extremas(&x, &chks, n));
}

Let’s walk through the process of turning this into parallel code (with minimal changes from your original code, and using manual threads and the standard library instead of rayon’s thread pool) step by step.

The first step is to set up a thread::scope and spawn a thread for every loop iteration that ought to happen in parallel

fn extremas(x: &[u64], chunks: &Vec<(usize, usize)>, k: usize) -> (u64, u64) {
    let mut minr = vec![0_u64; k];
    let mut maxr = vec![0_u64; k];
    thread::scope(|s| {
        for i in 0..k {//       <- Loop to  do in parallel
            s.spawn(|| {
                let mut a = x[0];
                let mut b = x[0];
                for j in (chunks[i].0)..=(chunks[i].1) {
                    if x[j] > b {
                        b = x[j];
                    } else if x[j] < a {
                        a = x[j];
                    }
                }
                minr[i] = a;
                maxr[i] = b;
            });
        }
    });
    min_max(&minr, &maxr)
}

This results in errors including these ones:

error[E0499]: cannot borrow `minr` as mutable more than once at a time
  --> src/main.rs:28:21
   |
26 |       thread::scope(|s| {
   |                      - has type `&'1 Scope<'1, '_>`
27 |           for i in 0..k {//       <- Loop to  do in parallel
28 |               s.spawn(|| {
   |               -       ^^ `minr` was mutably borrowed here in the previous iteration of the loop
   |  _____________|
   | |
29 | |                 let mut a = x[0];
30 | |                 let mut b = x[0];
31 | |                 for j in (chunks[i].0)..=(chunks[i].1) {
...  |
38 | |                 minr[i] = a;
   | |                 ---- borrows occur due to use of `minr` in closure
39 | |                 maxr[i] = b;
40 | |             });
   | |______________- argument requires that `minr` is borrowed for `'1`

error[E0499]: cannot borrow `maxr` as mutable more than once at a time
  --> src/main.rs:28:21
   |
26 |       thread::scope(|s| {
   |                      - has type `&'1 Scope<'1, '_>`
27 |           for i in 0..k {//       <- Loop to  do in parallel
28 |               s.spawn(|| {
   |               -       ^^ `maxr` was mutably borrowed here in the previous iteration of the loop
   |  _____________|
   | |
29 | |                 let mut a = x[0];
30 | |                 let mut b = x[0];
31 | |                 for j in (chunks[i].0)..=(chunks[i].1) {
...  |
39 | |                 maxr[i] = b;
   | |                 ---- borrows occur due to use of `maxr` in closure
40 | |             });
   | |______________- argument requires that `maxr` is borrowed for `'1`

The problem is that minr and maxr are accessed mutably from multiple threads now, we need to split them up in a way that the compiler understands that what we’re doing is fine (since each thread only views their own entry in those Vecs. We can do that as in my previous post where a loop was parallelized by using an iterator instead of indexing, and using iter::zip to iterate multiple things in lock-step, just as for i in 0..k { … minr[i] … maxr[i] … } does.

fn extremas(x: &[u64], chunks: &Vec<(usize, usize)>, k: usize) -> (u64, u64) {
    let mut minr = vec![0_u64; k];
    let mut maxr = vec![0_u64; k];
    thread::scope(|s| {
        for (minr_i, maxr_i) in std::iter::zip(&mut minr, &mut maxr) {
            s.spawn(|| {
                let mut a = x[0];
                let mut b = x[0];
                for j in (chunks[i].0)..=(chunks[i].1) {
                    if x[j] > b {
                        b = x[j];
                    } else if x[j] < a {
                        a = x[j];
                    }
                }
                *minr_i = a;
                *maxr_i = b;
            });
        }
    });
    min_max(&minr, &maxr)
}
error[E0425]: cannot find value `i` in this scope
  --> src/main.rs:31:34
   |
31 |                 for j in (chunks[i].0)..=(chunks[i].1) {
   |                                  ^ help: a local variable with a similar name exists: `a`

Note how the access minr[i] becomes *minr_i now, where we’ll have to dereference the reference the zipped iterator gave us.

We don’t have i anymore, but we can get it back via Iterator::enumerate, if we want to. We could also use yet-another usage of zip to use an iterator over chunks, too, alternatively. To stay “closer” to your original code, and to demonstrate something different, I’ll use enumerate, but the other approach would probably usually be preferred by many Rust users. For zipping more than 2 things, there’s also convenient macros in other crates like: izip in itertools - Rust.

fn extremas(x: &[u64], chunks: &Vec<(usize, usize)>, k: usize) -> (u64, u64) {
    let mut minr = vec![0_u64; k];
    let mut maxr = vec![0_u64; k];
    thread::scope(|s| {
        for (i, (minr_i, maxr_i)) in std::iter::zip(&mut minr, &mut maxr).enumerate() {
            s.spawn(|| {
                let mut a = x[0];
                let mut b = x[0];
                for j in (chunks[i].0)..=(chunks[i].1) {
                    if x[j] > b {
                        b = x[j];
                    } else if x[j] < a {
                        a = x[j];
                    }
                }
                *minr_i = a;
                *maxr_i = b;
            });
        }
    });
    min_max(&minr, &maxr)
}
error[E0373]: closure may outlive the current function, but it borrows `i`, which is owned by the current function
  --> src/main.rs:28:21
   |
26 |     thread::scope(|s| {
   |                    - has type `&'1 Scope<'1, '_>`
27 |         for (i, (minr_i, maxr_i)) in std::iter::zip(&mut minr, &mut maxr).enumerate() {
28 |             s.spawn(|| {
   |                     ^^ may outlive borrowed value `i`
...
31 |                 for j in (chunks[i].0)..=(chunks[i].1) {
   |                                  - `i` is borrowed here
   |
note: function requires argument type to outlive `'1`
  --> src/main.rs:28:13
   |
28 | /             s.spawn(|| {
29 | |                 let mut a = x[0];
30 | |                 let mut b = x[0];
31 | |                 for j in (chunks[i].0)..=(chunks[i].1) {
...  |
39 | |                 *maxr_i = b;
40 | |             });
   | |______________^
help: to force the closure to take ownership of `i` (and any other referenced variables), use the `move` keyword
   |
28 |             s.spawn(move || {
   |                     ++++

Unlike the trickery that I’ve discussed in your previous thread, for capturing an index in the closure, like i, there’s no (good) way around converting our code into a move closure, so we’ll do that, as the compiler suggests.

fn extremas(x: &[u64], chunks: &Vec<(usize, usize)>, k: usize) -> (u64, u64) {
    let mut minr = vec![0_u64; k];
    let mut maxr = vec![0_u64; k];
    thread::scope(|s| {
        for (i, (minr_i, maxr_i)) in std::iter::zip(&mut minr, &mut maxr).enumerate() {
            s.spawn(move || {
                let mut a = x[0];
                let mut b = x[0];
                for j in (chunks[i].0)..=(chunks[i].1) {
                    if x[j] > b {
                        b = x[j];
                    } else if x[j] < a {
                        a = x[j];
                    }
                }
                *minr_i = a;
                *maxr_i = b;
            });
        }
    });
    min_max(&minr, &maxr)
}

And voilà, we’re already finished. That was even more hassle-free than anticipated :slight_smile:

6 Likes

Thank you @steffahn
I need to study more your code.
In the meantime, I wanted to ask you.

  • do you provide Rust training classes? I see that you are based in Japan. I am based in London (UK). If so I would need to ask some of my colleague if they are interested as I don't think I would be able to absorb the cost on my own.
  • is there any book on Rust that you recommend?
    Thank you again.

And here's an alternative formulation that uses the fact that threads can return values when joined:

fn reduce_min_max(iter: impl IntoIterator<Item=(u64, u64)>)->(u64, u64) {
    let mut iter = iter.into_iter();
    let (mut min, mut max) = iter.next().unwrap();
    for (nextmin, nextmax) in iter {
        min = u64::min(min, nextmin);
        max = u64::max(max, nextmax);
    }
    (min, max)
}

fn extremas(x: &[u64], chunks: &Vec<(usize, usize)>) -> (u64, u64) {
    thread::scope(|s| { // Lets us use x which is a non-static reference
        // Make iterator of slices
        let slices = chunks.iter().copied().map(|(start, end)| &x[start..=end]);
        
        // Spawn a thread for each slice
        let mut threads = Vec::with_capacity(chunks.len());
        for xslice in slices {
            // reduce_min_max() expects each element to be a (min, max) tuple,
            // so duplicate each element.
            let iter = xslice.iter().copied().map(|v| (v,v));
            threads.push(s.spawn(move || reduce_min_max(iter)));
        }
        
        // Collect the results
        reduce_min_max(threads.into_iter().map(|t| t.join().unwrap()))
    })
}

Or, using the builtin chunks() instead of your hand-written make_chunks:

use rand::Rng;
use std::thread;

fn reduce_min_max(iter: impl IntoIterator<Item=(u64, u64)>)->(u64, u64) {
    let mut iter = iter.into_iter();
    let (mut min, mut max) = iter.next().unwrap();
    for (nextmin, nextmax) in iter {
        min = u64::min(min, nextmin);
        max = u64::max(max, nextmax);
    }
    (min, max)
}

fn extremas(x: &[u64], k:usize) -> (u64, u64) {
    thread::scope(|s| { // Lets us use x which is a non-static reference
        // Spawn a thread for each slice
        let mut threads = Vec::with_capacity(k);
        for xslice in x.chunks(x.len() / k) {
            // reduce_min_max() expects each element to be a (min, max) tuple,
            // so duplicate each element.
            let iter = xslice.iter().copied().map(|v| (v,v));
            threads.push(s.spawn(move || reduce_min_max(iter)));
        }
        
        // Collect the results
        reduce_min_max(threads.into_iter().map(|t| t.join().unwrap()))
    })
}

fn main() {
    let n: usize = 4; // number of threads
    let len: usize = 9;
    
    let mut rng = rand::thread_rng();
    let range = rand::distributions::Uniform::new(0, 5);
    let x: Vec<u64> = (0..len).map(|_| rng.sample(range)).collect();
    
    println!("x = {:?}", x);
    println!("(min, max) = {:?}", extremas(&x, n));
}
3 Likes

How would you change your solution if you had to pass a 3rd mutable variable in addition to minr and maxr?
std::iter::zip only accept 2 arguments.
Thank you.

I mentioned that briefly above. Either do nested calls of zip, or use the izip! macro from the itertools crate :wink:

2 Likes

You can also manually advance an iterator within the loop if you'd prefer. It's not super idiomatic Rust, but can sometimes make more complicated situations clearer:

let mut minr_iter = minr.iter_mut();
let mut maxr_iter = maxr.iter_mut();
let mut extra_iter = extra.iter_mut();
for i in 0..k {
    let minr_i = minr_iter.next().expect("minr not long enough!");
    let maxr_i = maxr_iter.next().expect("maxr not long enough!");
    let extra_i = extra_iter.next().expect("extra not long enougt!");

   s.spawn(/* ... */);
}
1 Like

Thank you @steffahn and @2e71828

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.