Sharing vector with Rayon par_iter() correctly

Please understand I know what I'm doing and I know this works!!
I'm having issues using Rayon to parallelize some simple inner loops.

Here's the original code:

fn sozpg(val: usize, res_0: usize, start_num : usize, end_num : usize) -> Vec<usize> {
  let (md, rscnt) = (30, 8);
  static RES: [usize; 8] = [7,11,13,17,19,23,29,31];
  static BITN: [u8; 30] = [0,0,0,0,0,1,0,0,0,2,0,4,0,0,0,8,0,16,0,0,0,32,0,0,0,0,0,64,0,128];
  
  let kmax = (val - 2) / md + 1; 
  let mut prms = vec![0u8; kmax];
  let sqrt_n = val.integer_sqrt();
  let (mut modk, mut r, mut k) = (0, 0, 0 );
  
  loop {
    if r == rscnt { r = 0; modk += md; k += 1 }
    if (prms[k] & (1 << r)) != 0 { r += 1; continue }
    let prm_r = RES[r];           
    let prime = modk + prm_r;     
    if  prime > sqrt_n { break }  
    for ri in &RES {              
      let prod = prm_r * ri - 2;  
      let bit_r = BITN[prod % md];
      let mut kpm = k * (prime + ri) + prod / md;
      while kpm < kmax { prms[kpm] |= bit_r; kpm += prime };
    }
    r += 1;
  }
  let mut primes = vec![];
  for (k, resgroup) in prms.iter().enumerate() { 
    for (i, r_i) in RES.iter().enumerate() {     
      if resgroup & (1 << i) == 0 { 
        let prime = md * k + r_i;   
        let (n, rem) = (start_num / prime, start_num % prime);
        if (prime >= res_0 && prime <= val) && (prime * (n + 1) <= end_num || rem == 0) { primes.push(prime); }
  } } }
  primes
}

I want to use Rayon (I'm already using it other places) to do this loop in parallel.

for ri in &RES {              
      let prod = prm_r * ri - 2;  
      let bit_r = BITN[prod % md];
      let mut kpm = k * (prime + ri) + prod / md;
      while kpm < kmax { prms[kpm] |= bit_r; kpm += prime };
    }

Here's how I did it with Crystal, which works as intended.

def sieve(ri, md, k, prime, prm_r, kmax, bitn, prms)
  kn, rn = (prm_r * ri - 2).divmod md
  kpm = k * (prime + ri) + kn
  bit_r = bitn[rn]
  while kpm < kmax; prms[kpm] |= bit_r; kpm += prime end
end

def sozpg
  ...
  loop do  
    if (r += 1) == rscnt; r = 0; modk += md; k += 1 end
    next if prms[k] & (1 << r) != 0
    prm_r = res[r]  
    prime = modk + prm_r  
    break if prime > Math.isqrt(val)
    done = Channel(Nil).new(rscnt)
    res.each do |ri| 
      spawn do sieve(ri, md, k, prime, prm_r, kmax, bitn, prms); done.send(nil) end
    end 
    rscnt.times { done.receive }
  end

After many attempts, I got this far to get only 1 error.

    RES.par_iter().for_each ( |ri| {
      let prod = prm_r * ri - 2;  
      let bit_r = BITN[prod % md];
      let mut kpm = k * (prime + ri) + prod / md;
      while kpm < kmax { *&prms[kpm] |= bit_r; kpm += prime; };
    });

error[E0594]: cannot assign to data in a `&` reference
   --> src/main.rs:177:26
    |
177 |       while kpm < kmax { *&prms[kpm] |= bit_r; kpm += prime; };
    |                          ^^^^^^^^^^^^^^^^^^^^ cannot assign

For more information about this error, try `rustc --explain E0594`.
error: could not compile `twinprimes_ssoz` due to previous error

The compiler has been excellent in getting me down to just 1 error, but now
I'm in a loop trying to get it to allow prms to be shared and written into.

HELP!

I also want to do this in parallel too.

for (k, resgroup) in prms.iter().enumerate() { 
    for (i, r_i) in RES.iter().enumerate() {     
      if resgroup & (1 << i) == 0 { 
        let prime = md * k + r_i;   
        let (n, rem) = (start_num / prime, start_num % prime);
        if (prime >= res_0 && prime <= val) && (prime * (n + 1) <= end_num || rem == 0) { primes.push(prime); }
  } } }

If you want to modify something shared from within rayon, then you must split it up using their iterators and write to it through mutable references given to you by the iterators. This is necessary so that rayon can verify that you are not modifying the same index in parallel from multiple threads. Using indexing will not work regardless of how many ampersands and asterisks you are inserting.

2 Likes

If it's too difficult to rewrite your loop into something like prms.par_iter_mut() to split mutability, then another way is to use thread-safe mutability -- Mutex in general, or perhaps Vec<AtomicU8> here.

For your second example, you could use collect something like this:

let primes = prms.par_iter().enumerate().flat_map_iter(|(k, resgroup)| {
    RES.iter().enumerate().filter_map(|(i, r_i)| {
        if resgroup & (1 << i) == 0 { 
            let prime = md * k + r_i;   
            let (n, rem) = (start_num / prime, start_num % prime);
            if (prime >= res_0 && prime <= val) && (prime * (n + 1) <= end_num || rem == 0) {
                return Some(prime);
            }
        }
        None
    })
}).collect();

That worked (after compiler guidance). That shaved 3s off 22s in one example.
The real saving is in the other loop. Which was about 40% in Crystal version.

RES.iter().enumerate().filter_map(|i , r_i)| {
to
RES.iter().enumerate().filter_map(move |(i, r_i)| {

Actually iterating over the rows of the byte bits for array length is faster (only need 8 threads) than iterating over each byte bit in the array columns. The later extracts the primes in order, but that's algorithmically unnecessary for where the primes are used.

  let primes = RES.par_iter().enumerate().flat_map_iter(|(i, r_i)| {
    prms.iter().enumerate().filter_map(move |(k, resgroup)| {
        if resgroup & (1 << i) == 0 { 
            let prime = md * k + r_i;   
            let (n, rem) = (start_num / prime, start_num % prime);
            if (prime >= res_0 && prime <= val) && (prime * (n + 1) <= end_num || rem == 0) {
                return Some(prime);
            }
        }
        None
    })
  }).collect();

OK, I tried this approach that worked in Crystal.

fn sieve(ri: usize, md: usize, k: usize, prime: usize, prm_r: usize, kmax: usize, bitn: &Vec<u8>, prms: &mut Vec<u8>) {
  let prod = prm_r * ri - 2;
  let bit_r = bitn[prod % md];
  let mut kpm = k * (prime + ri) + prod / md;
  while kpm < kmax { prms[kpm] |= bit_r; kpm += prime };
}


    loop {
    if r == rscnt { r = 0; modk += md; k += 1 }
    if (prms[k] & (1 << r)) != 0 { r += 1; continue }
    let prm_r = RES[r];
    let prime = modk + prm_r;
    if  prime > sqrt_n { break }
    RES.par_iter().for_each (|ri| {
      sieve(*ri, md, k, prime, prm_r, kmax, &BITN.to_vec(), &mut prms);
    });
    
error[E0596]: cannot borrow `prms` as mutable, as it is a captured variable in a `Fn` closure
   --> src/main.rs:183:61
    |
183 |       sieve(*ri, md, k, prime, prm_r, kmax, &BITN.to_vec(), &mut prms);
    |                                                             ^^^^^^^^^ cannot borrow as mutable

For more information about this error, try `rustc --explain E0596`.
error: could not compile `twinprimes_ssoz` due to previous error

It still won't accept passing in the whole array.

To do it your way I think means constructing the byte rows as 8 arrays representing those values.
That means creating|processing 8 arrays separately, which means extracting primes from them separately.

Before I even go there (if I think its worth it) is there some unsafe way to share the byte array between the threads using something equivalent to pointers?

Or is there some equivalent way to tell Rayon|Rust, damn it, just do what I want, I accept all consequences!!

Is it possible that two different threads could perform a bit-or operation on the same index?

With my earlier suggestion of atomics, that indexed update can use fetch_or, and even Ordering::Relaxed is probably fine.

No. The mathematics ensure for each instance in the loop only one byte row is written at a time, i.e. they are totally independent. Each time thru the loop a different row in processed until all 8 are done. That's why the Crystal version works. I suspect, and wanted to verify, the Rust version worked, which should be faster because Rayon is more mature than Crystal's current threading model.

I have answered a similar question before, then: multithreading - How do I write to a mutable slice from multiple threads at arbitrary indexes without using mutexes? - Stack Overflow

The prms vector is an [8 x kmax] table, where kmax is determined by the input val. So a byte represents the 8 rows of kmax columns.

I really don't want to do this (it's less memory efficient), but I could conceptually create prms as a vector of length (8*kmax) segments, where each kmax section represent 1 byte row values. Then I could process them as 8 separate slices and do in parallel. Would this make the compiler happier?

I still would like to do it the my original way first, just to know I can do it that way in Rust.

If you are making parallel writes to the same bytes, then your code simply will not work, even if they are modifying different bits of that byte. Any code that does this in any language will be incorrect. If it gave the right result, then you were simply lucky that the timing was such that the operations didn't get messed op, and future runs of the same code may result in the timing being slightly different in a way that makes it give the wrong result.

The two main solutions are to use a separate byte per bit as you have suggested, or to use an array of AtomicU8, which provides a fetch_or method that you can use to make the operations safely in parallel.

1 Like

Can you provide example code to do this?

You can create an Vec<AtomicU8> and call prms[kpm].fetch_or(bit_r, Relaxed).

The array as AtomicU8 causes a bunch of incompatibilities elsewhere when reading|writing to it

Any more suggestions?

There is an unstable AtomicU8::from_mut method to convert &mut u8 to &mut AtomicU8. The tracking issue also mentions the possibility of adding from_mut_slice. Then you could have your mutable Vec<u8> for most of the code and borrow it as an atomic slice for the parallel updates.

You can do this yourself with a pointer cast:

fn atomic_slice(slice: &mut [u8]) -> &[AtomicU8] {
    unsafe { &*(slice as *mut [u8] as *const [AtomicU8]) }
}

Note that this function creates an implicit lifetime bound between the input and output slices, which lets the borrow checker track it properly, but this would not be the case if you wrote that same block of code directly within your other code. In general, &*ptr can create an arbitrary lifetime, even 'static, so it's important to constrain it properly.

1 Like

Thanks for the suggestion but I have no idea how to functionally code that in the program to get what I want.

Unfortunately, it seems its just not viable to do this in Rust, though easy to do in Crystal.
I know you're philosophically opposed to doing this (sharing a mutable array between threads), but real world needs don't always align with arbitrary philosophical edicts.

What we are trying to prevent you from doing is to make a data race by modifying the same index of the array from several threads in parallel. This is wrong in Rust, and it is also wrong in Crystal. If you want to write the same incorrect code in Rust, you can use the stackoverflow link I posted earlier and ignore the requirement that the indexes are disjoint on the unsafe function.

The only difference between what you already posted and what @cuviper suggested is that you should add the following before the rayon loop:

let prms_atomic = atomic_slice(&mut prms);

Then replace prms[kpm] |= right-hand-side with this:

prms_atomic[kpm].fetch_or(right-hand-side, Ordering::Relaxed);

Once you have finished the parallel rayon section, you can use the prms vector directly, and the changes will be visible in the vector.

1 Like

To illustrate why this isn't just a philosophical discussion, and that your code actually does the wrong thing, consider the following code:

use std::cell::UnsafeCell;
use rayon::prelude::*;

#[derive(Copy, Clone)]
pub struct UnsafeSlice<'a, T> {
    slice: &'a [UnsafeCell<T>],
}
unsafe impl<'a, T: Send + Sync> Send for UnsafeSlice<'a, T> {}
unsafe impl<'a, T: Send + Sync> Sync for UnsafeSlice<'a, T> {}

impl<'a, T> UnsafeSlice<'a, T> {
    pub fn new(slice: &'a mut [T]) -> Self {
        let ptr = slice as *mut [T] as *const [UnsafeCell<T>];
        Self {
            slice: unsafe { &*ptr },
        }
    }
    
    /// SAFETY: It is UB if two threads write to the same index without
    /// synchronization.
    pub unsafe fn get_index(&self, i: usize) -> &mut T {
        let ptr = self.slice[i].get();
        &mut *ptr
    }
}


fn main() {
    const LEN: usize = 1024*1024;

    let mut array = vec![0u8; LEN];
    let array_unsafe_slice = UnsafeSlice::new(&mut array);
    
    (0..8).into_par_iter().for_each(|bit_idx| {
        let bit = 1 << bit_idx;
        for i in 0..LEN {
            unsafe {
                *array_unsafe_slice.get_index(i) |= bit;
            }
        }
    });
    
    let mut counter = 0;
    for byte in array {
        if byte != 0xff {
            counter += 1;
        }
    }
    println!("Failed to set all bits of {} indexes.", counter);
}

click here to run

This code uses the utility from the stack overflow link to unsafely set every bit in a megabyte to one, one bit at the time. Then it counts how many bytes did not have all of its bits set to one. When I ran it, it gave the following output:

Failed to set all bits of 12995 indexes.

(the output varies from run-to-run)

Obviously if you change it to not run in parallel by removing the into_par_iter(), then there is no data race and it always prints zero as it should.

Similar code in crystal should give similar results.

2 Likes

OK, I deciphered your comments, and the compiler, and figured everything out.
Here are all the things needed for the code to compile. And it works great like I want. :grinning:

extern crate integer_sqrt;
extern crate rayon;
use integer_sqrt::IntegerSquareRoot;
use rayon::prelude::*;
use std::sync::atomic::{AtomicU8, Ordering};

fn atomic_slice(slice: &mut [u8]) -> &[AtomicU8] {
    unsafe { &*(slice as *mut [u8] as *const [AtomicU8]) }
}

fn sozpg(val: usize, res_0: usize, start_num : usize, end_num : usize) -> Vec<usize> {
  let (md, rscnt) = (30, 8);
  static RES: [usize; 8] = [7,11,13,17,19,23,29,31];
  static BITN: [u8; 30] = [0,0,0,0,0,1,0,0,0,2,0,4,0,0,0,8,0,16,0,0,0,32,0,0,0,0,0,64,0,128];
  
  let kmax = (val - 2) / md + 1;  
  let mut prms = vec![0u8; kmax]; 
  let sqrt_n = val.integer_sqrt();
  let (mut modk, mut r, mut k) = (0, 0, 0);
  
  loop {
    if r == rscnt { r = 0; modk += md; k += 1 }
    if (prms[k] & (1 << r)) != 0 { r += 1; continue }
    let prm_r = RES[r];         
    let prime = modk + prm_r;   
    if  prime > sqrt_n { break }
    let prms_atomic = atomic_slice(&mut prms);
    // mark prime's multiples on each bit row in prms in parallel
    RES.par_iter().for_each (|ri| { 
      let prod = prm_r * ri - 2;    
      let bit_r = BITN[prod % md];  
      let mut kpm = k * (prime + ri) + prod / md;
      while kpm < kmax { prms_atomic[kpm].fetch_or(bit_r, Ordering::Relaxed); kpm += prime; };
    });
    r += 1;
  }
  // numerate the primes on each bit row in prms in parallel
  let primes = RES.par_iter().enumerate().flat_map_iter( |(i, ri)| {
    prms.iter().enumerate().filter_map(move |(k, resgroup)| {
      if resgroup & (1 << i) == 0 { 
        let prime = md * k + ri;   
        let (n, rem) = (start_num / prime, start_num % prime);
        if (prime >= res_0 && prime <= val) && (prime * (n + 1) <= end_num || rem == 0) {
          return Some(prime);
      } } None
  }) }).collect();
  primes
}

Here's the actual code base it's apart of.

If you want to know what it's doing see this video, and check out links in the code.

Thanks to @alice, @cuviper, and everyone else for your help and patience.

And @alice, everything you brought up I was already aware of. As I stated in the beginning, Crystal doesn't have a mature multi-threading model, but it's good enough to do what I wanted to show the efficacy of the approach.

Sometimes parallel index hits do occur, which causes some bits (very few in the big picture) to not be set to '1', but that doesn't effect the overall algorithm those primes are used in. All that means algorithmically is that more than the mathematical minimum number of values (primes) are used later in the program, but always the necessary primes will be generated. At the most, the Crystal version generates in the 10s more of unnecessary values by not marking some bits as nonprimes (composites) as it should. The key thing is to generate them in parallel, i.e. fast, so they can be used in the general SSoZ sieve (in parallel) as soon as possible. Even a few 100 extra unnecessary values wouldn't matter, as the speed increase swamps out any other coding inefficiencies.

But I figured (and was correct) Rust|Rayon should be able to do it with maximum coding efficiency and not miss any composites. It was just frustrating to be so close, and yet so far, in me getting it to work. I just didn't know enough of the details on how to do it. But now with yours help I do (I still need to understand how/why everything is needed to work though).

So again, thanks for all the help.

I'm in the process of writing another paper (maybe do another video?) explaining in gory detail the math, algorithm, coding, and thinking behind the SSoZ. When I finish it I'll post it somewhere in the forum for people who want to know, because it really is a good showcase for how to use Rust (or any language) to do advanced parallel programming in a numerically heavy algorithm.