Help -- Fastest Prime Sieve, in Rust

I'm translating a Twin Primes Sieve originally done in Nim (then D version).

Here's the Nim version:

Here's the D version:

So far, here's my Rust version.

Can compile with: $ rustc -O twinprimes_ssoz.rs

My Rust version works in that it produces correct values for inputs. The biggest problem is the Rust threading model vs Nim|D.

Monitoring threads use with htop, the Nim|D versions only have pids active for the number of available threads (on my system 8). For the Rust version, which uses channels, I see dozens and dozens of pids active at a time, and see increasing memory use too. When the inputs reach a certain size that uses up all my ram (16GBs) the program crashes (seg faults).

The versions also differ in that I use global variables in the Nim|D versions which allow me to create compile time constants. Since I can't do globals in Rust I generate the necessary constants at runtime, which luckily doesn't add too much to the setup time.

However, in the Nim|D versions I create two global arrays to store the results for each thread (that performs twins_sieve). In Rust I use channels to collect the results from each thread, and process results after they finish.

Below is the threading code:

  //Create a channel to send the results from the threads
  let (sender, receiver) = mpsc::channel();

  for index in 0..pairscnt {
    //Clone the necessary data
    let (Kmin_0, Kmax_0, KB_0, modpg_0, start_num_0, end_num_0, primes_0, restwins_0, resinvrs_0, pos_0, sender_0) =
            (Kmin, Kmax, KB, modpg, start_num, end_num,
             primes.clone(), restwins.clone(), resinvrs.clone(), pos.clone(), sender.clone());
    //Execute 'twin_sieve' in parallel, sending the results to the channel
    thread::spawn(move || {
        sender_0.send(twins_sieve(index, Kmin_0, Kmax_0, KB_0, start_num_0, end_num_0, modpg_0,
                      Arc::new(primes_0.to_vec()), Arc::new(restwins_0), Arc::new(resinvrs_0), Arc::new(pos_0)))
        .expect("Unable to send to channel");
    });
    print!("\r{} of {} threads done",index+1, pairscnt);
  }

  let mut last_twin = 0usize;      // find largest twin prime in range
  for _ in 0..pairscnt {
        let (l, c) = receiver.recv().expect("Unable to receive from channel");
        twinscnt += c;
        if l > last_twin {last_twin = l;}
  }

Here's the thread code in Nim:

  parallel:                        # perform in parallel
    for indx in 0..pairscnt - 1:   # for each twin pair row index
      spawn twins_sieve(indx)      # sieve selected twin pair restracks
      stdout.write("\r", (indx + 1), " of ", pairscnt, " threads done")
  sync()                           # when all the threads finish
  var last_twin = 0'u              # find largest twin prime in range
  for i in 0..pairscnt - 1:        # and determine total twin primes count
    twinscnt += cnts[i]
    if last_twin < lastwins[i]: last_twin = lastwins[i]

I'm trying to do this on my own. I've been looking at using crossbeam to do threading, but haven't figure out how to use it for my case.

I assume the Rust version will be comparable in speed once the threading process is done better.

Thus, my immediate question is how to do threading in Rust so that it doesn't spawn off more pids than threads (threadpools ?) and/or eats up an increasing amounts of memory?

2 Likes

This looks like a good candidate for rayon. With that, you might replace the loop with something like:

let last_twin = (0..pairscnt)
    .par_iter()
    .map(|i| {
        twins_sieve(
            i,
            Kmin,
            Kmax,
            ...,  // other arguments
            primes.clone(),
            ...,  // other arguments
        )
    })
    .max()
    .unwrap_or(0);

This is the working serial version of what I'd like to do in parallel.
As you see it doesn't use an iterator.
How do I make this serial version into an iterator form, which then I should be able to do just par_iter() with rayon to make parallel, right?

  let mut lastwins = vec![0usize; pairscnt];
  let mut cnts = vec![0usize; pairscnt];
  for index in 0..pairscnt {
       let (last, sum) = twins_sieve(index, Kmin, Kmax, KB, start_num, end_num, modpg,
                      Arc::new(primes.to_vec().clone()), Arc::new(restwins.clone()), Arc::new(resinvrs.clone()), Arc::new(pos.clone()));
      lastwins[index] = last; cnts[index] = sum;
  };

  let mut last_twin = 0usize;      // find largest twin prime|sum in range
  for i in 0..pairscnt {
     twinscnt += cnts[i];
     if last_twin < lastwins[i] { last_twin = lastwins[i]; }
  }

Ok, I found how to turn the range 0..pairscnt into a vec! which I can then use .iter() on, but this still won't let me then use par_iter() on it. Also, turning the range into a vec, etc, is slower than just iterating over the range, for the serial case.

  let pairs_indexes: Vec<usize> = (0..pairscnt).collect();
  for &index in pairs_indexes.iter() {
       let (last, sum) = twins_sieve(index, Kmin, Kmax, KB, start_num, end_num, modpg,
                      Arc::new(primes.to_vec().clone()), Arc::new(restwins.clone()), Arc::new(resinvrs.clone()), Arc::new(pos.clone()));
      lastwins[index] = last; cnts[index] = sum;
  };

@jzakiya you have to get rid of the for loop entirely... see @asymmetrikon's example.

The serial version does (functionally) what I want. twins_sieve returns 2 values, the number (count) of twin primes and largest twin prime per index|thread. Thus, I sum the counts for each index|thread and keep the max twin prime value. The example that @asymmetrikon doesn't do that, but it didn't compile anyway.

Again, the original parallel version I show in the gist works in that it produces correct answers, but it eats up memory, unlike the Nim|D versions, which is why I'm seeking a better Rust parallel version.

But no matter how I do it, (0..pairscnt).iter() or (0..pairscnt).par_iter() doesn't compile because it's a range which the compiler won't do .iter() on.

So his given example won't compile, but doesn't functionally do what I want anyway.

The problem with par_iter is fixed by writing

(0..pairscnt).into_par_iter()

This iterates by value, whereas par_iter iterates by reference.

Not sure about/am not awake enough yet to know how much fixing that will ultimately help.

(0..pairscnt).into_par_iter() { |index|
       let (last, sum) = twins_sieve(index, Kmin, Kmax, KB, start_num, end_num, modpg,
                      Arc::new(primes.to_vec().clone()), Arc::new(restwins.clone()), Arc::new(resinvrs.clone()), Arc::new(pos.clone()));
      lastwins[index] = last; cnts[index] = sum;
  };
➜  twinprimes_ssoz git:(master) ✗ cargo build --release          
   Compiling twinprimes_ssoz v0.1.0 (/home/jzakiya/Rust Projects/twinprimes_ssoz)
error: expected one of `.`, `;`, `?`, `}`, or an operator, found `{`
   --> src/main.rs:309:33
    |
309 |   (0..pairscnt).into_par_iter() { |index|
    |                                 ^ expected one of `.`, `;`, `?`, `}`, or an operator here

error[E0308]: mismatched types
   --> src/main.rs:309:3
    |
231 | fn main() {
    |           - expected `()` because of default return type
...
309 |   (0..pairscnt).into_par_iter() { |index|
    |   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^- help: try adding a semicolon: `;`
    |   |
    |   expected (), found struct `rayon::range::Iter`
    |
    = note: expected type `()`
               found type `rayon::range::Iter<usize>`

error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0308`.
error: Could not compile `twinprimes_ssoz`.

To learn more, run the command again with --verbose.

OK, I found out how to make it do 97% of what I want, it's the other 3% that's throwing me.

The documentation below is extremely helpful.


https://hub.packtpub.com/multithreading-in-rust-using-crates-tutorial/
https://rust-lang-nursery.github.io/rust-cookbook/concurrency/parallel.html

Here's the 3% problem.

twins_sieve produces 2 output values; the twin primes (tp) count and largest tp per thread.
I can use rayon iterators to either find the sum of all the counts OR the max tp value with no problem.
To do this I have to modify twins_sieve to only output the parameter I want to process (tp counts, or largest tp). Below is working code to find the sum (total count) of twin primes in parallel.

  let tp_sum: usize = (0..pairscnt).into_par_iter().map( |index|
     twins_sieve(index, Kmin, Kmax, KB, start_num, end_num, modpg,
        Arc::new(primes.to_vec().clone()), Arc::new(restwins.clone()), Arc::new(resinvrs.clone()), Arc::new(pos.clone()) )
  ).sum();

If I want to find the largest twin prime I just change .sum() to .max().

The threads now work like in Nim|D, and don't eat up memory (though a tad bit slower than Rust parallel version using explicit threads).

Now I'm stuck at figuring out how to process both twins_sieve outputs in parallel.
If I use .map() as above, and try to assign the output to a tuple: let (largest, cnts) = ....
the compiler squawks.

Conceptually, below is the simplest way I want to do it.

  let lastwins = vec![0usize; pairscnt];
  let cnts = vec![0usize; pairscnt];
  (0..pairscnt).into_par_iter().for_each( |index| {
     let (last, sum) = twins_sieve(index, Kmin, Kmax, KB, start_num, end_num, modpg,
     twins_sieve(index, Kmin, Kmax, KB, start_num, end_num, modpg,
        Arc::new(primes.to_vec().clone()), Arc::new(restwins.clone()), Arc::new(resinvrs.clone()), Arc::new(pos.clone()) )
        lastwins[index] = last as usize; cnts[index] = sum as usize;
  });

If I comment out the last line: //lastwins... this actually compiles! (Of course I can't use the results.)
This is essentially the serial version I showed previously that works.

The problem is I don't know how to make lastwins and cnts be in scope AND be mutable so they can be written to in each thread. (In Nim|D versions they are global variables).

This is the last obstacle to overcome, using this design method. I suspect doing it the Rust way may require a different approach.

Also, the rayon docs seem to say I don't need to use Arc::new() to pass variables, which slows things down. I haven't tried it yet (want to make it fully work first), but is that true? Can I eliminate all the Arcs in passing the variables, and will this make the code faster?

Finally, are there other (obvious) things I can do all around to make the code faster?

It's hard to help much without the full code at hand, but:

Assuming twins_sieve is unchanged from the gist posted earlier, the arcs are totally redundant. The function never calls clone on any of them (which is literally the whole point of Arc), so it can just take borrowed slices:

fn twins_sieve(
    indx: usize, Kmin: usize, Kmax: usize, KB: usize, start_num: usize, end_num: usize, modpg: usize,
    primes: &[usize], restwins: &[usize], resinvrs: &[usize], pos: &[usize],
) -> (usize, usize)

This doesn't have anything to do with what rayon or some library X requires as you mention in your second to last paragraph; all I'm saying is, you wrote twins_sieve as well as the code that calls it on line 290, so you have complete control over what arguments that function takes. The only thing you need to worry about is which variables you are naming from the local environment inside the closure that begins on line 288.

As for the borrow issues: When indices give you trouble, use iterators that mutably borrow.

let mut lastwins = vec![0usize; pairscnt];
let mut cnts = vec![0usize; pairscnt];
lastwins.par_iter_mut()
    .zip(cnts.par_iter_mut())
    .enumerate()
    .for_each(|(index, (lastwin, cnt))| {
        /* blah */
        *lastwin = last as usize;
        *cnt = sum as usize;
    })
});

But this pattern of taking an iterator and unpacking it into two collections in lockstep already has a dedicated function: unzip. (which is basically collect, but for two vectors)

let (lastwins, cnts): (Vec<_>, Vec<_>) = {
    (0..pairscnt).into_par_iter()
        .map(|index| {
            let (last, sum) = /* blah */;
            (last as usize, cnt as usize)
        }).unzip()
};
2 Likes

My Man, that did it! :blush:

Here's the final working snippet.

  let lastwins = vec![0usize; pairscnt];
  let cnts = vec![0usize; pairscnt];
  let (lastwins, cnts): (Vec<_>, Vec<_>) = {
    (0..pairscnt).into_par_iter()
       .map( |index|
          twins_sieve( index, Kmin, Kmax, KB, start_num, end_num, modpg,
          Arc::new(primes.to_vec().clone()), Arc::new(restwins.clone()), 
          Arc::new(resinvrs.clone()), Arc::new(pos.clone()) )
    ).unzip()
  };
  let mut last_twin = 0usize; // find largest twin prime|sum in range
  for i in 0..pairscnt {
     twinscnt += cnts[i];
     if last_twin < lastwins[i] { last_twin = lastwins[i]; }
  }

The threads work correctly (like in Nim|D, don't devour mem) and produces the correct answers. It's still takes a consistent 10% longer than Nim|D versions, but I'll remove the Arcs and see how much that speeds it up.

When I fully revise the code I'll link the gist again for people to see it, and if people see more ways to improve it please post them and let me know. And feel free to play with it!

If you want to know about the algorithm see my paper below.

Thanks again @ExpHP for your help.

Note, the last let shadows the first two -- the compiler should even be warning you that the first two are unused. But if you do want to reuse existing vectors, say as a buffer for multiple runs, you can either use unzip_into_vecs, or use an unzipping extend like (lastwins, cnts).par_extend(...).

Thanks @cuviper for the suggestions.

I ran the compiler with the original code and saw the warnings about about the first lastwins|cnts variables not used, so I commented them out, and it compiled again.

Being new to Rust, could you explain real cases where you'd use unzip_into_vecs and extend?
I saw the docs at the links but can't imagine cases you'd use them for.

Mostly you would use those if you want to avoid the allocator, reusing the same block of memory multiple times. That may be an unnecessary micro-optimization in the context of thread synchronization though.

Another aspect is that unzip_into_vecs has a stronger guarantee about writing directly to the target vectors, being a method of IndexedParallelIterator, whereas unzip is more flexible and might require temporary storage. Think of things like filter and flat_map -- since we don't know how many items will be produced, we can't write them directly to their destination. The unzip implementation does try to use direct indexing when possible though.

Extend is also additive, so it could be useful for accumulating results from multiple parallel iterators, if for some reason you can't chain them all at once.

Thanks. There's so much to learn about Rust that you don't even know you need to...:slightly_frowning_face:

Almost done..

As instructed, I removed the Arc::new references, and the .clone()s to, which makes the code much simpler.

fn twins_sieve(indx: usize, Kmin: usize, Kmax: usize, KB: usize, 
   start_num: usize, end_num: usize, modpg: usize,
   primes: &[usize], restwins: &[usize], resinvrs: &[usize], 
   pos: &[usize]) -> (usize, usize) {
   .................
   .................
}
 
 let (lastwins, cnts): (Vec<_>, Vec<_>) = {
    (0..pairscnt).into_par_iter()
       .map( |index|
          twins_sieve( index, Kmin, Kmax, KB, start_num, end_num, modpg,
          &primes.to_vec(), &restwins, &resinvrs, &pos)
    ).unzip()
  };

But I noticed a quirk.

  1. Compiling with &primes.to_vec(),... vs &primes,... the binary is 712 bytes bigger and executes faster (by ~2.5 seconds (~53.3 to 55.8) for input of 1trillion (1_000_000_000_000), Nim is ~ 48 secs).So I tested with the other vectors using to_vec(), which compiled but had no affect on binary size or speed. I had to do primes.to_vec() to compile in the original version with threads.

  2. Like in the Nim|D versions, I want to have a progress indicator to show how many threads have been processed, to give you an idea of when it will finish (nice to have for large ranges). In the Nim|D code I just put a print statement: in Rust it would be like

    print!("\r{} of {} threads done", index + 1, pairscnt);

This works in my previous rust version, but if I put this line inside the new parallel code it won't compile. Any suggestion on how to add this feature to this code? I'd like to include a progress indicator in the version I end up releasing.

That's bizarre. Can't really help without the code though.

What is the error?

Here's the old code using explicit threads and Arcs for multi-threading.

Here's the new code using rayon for multi-threading.

You can run and see what I'm talking about on your system.

I see the opposite effect.

$ vim src/main.rs # observe that .to_vec() is present
$ echo 1 1000000000000 | cargo run --release | grep 'sieve time'
sieve time = 119.699186963 secs
$ echo 1 1000000000000 | cargo run --release | grep 'sieve time'
sieve time = 119.994843401 secs

$ vim src/main.rs # remove the .to_vec()
$ echo 1 1000000000000 | cargo run --release | grep 'sieve time'
sieve time = 118.866689849 secs
$ echo 1 1000000000000 | cargo run --release | grep 'sieve time'
sieve time = 118.568100914 secs

$ vim src/main.rs # add the .to_vec() back just in case
$ echo 1 1000000000000 | cargo run --release | grep 'sieve time'
sieve time = 119.371369926 secs

(...but! Watch what happens next...)


I get no compiler error for adding the print:

       .map(|index| {
         let out = twins_sieve(
           index, kmin, kmax, kb, start_num, end_num, modpg,
           &primes, &restwins, &resinvrs, &pos,
         );
         print!("\r{} of {} threads done", index + 1, pairscnt);
         out
       }).unzip()

However, they all print out of order because they run out of order. To fix this you need a counter:

use std::sync::atomic::{self, AtomicUsize};

// A counter implemented using relaxed (unsynchronized) atomic operations.
struct RelaxedCounter(AtomicUsize);

impl RelaxedCounter {
    fn new() -> Self {
        RelaxedCounter(AtomicUsize::new(0))
    }
    /// Increment and get the new value.
    fn increment(&self) -> usize {
        self.0.fetch_add(1, atomic::Ordering::Relaxed) + 1
    }
}
let (lastwins, cnts): (Vec<_>, Vec<_>) = {
  let counter = RelaxedCounter::new();
  (0..pairscnt).into_par_iter()
     .map(|index| {
       let out = twins_sieve(
         index, kmin, kmax, kb, start_num, end_num, modpg,
         &primes, &restwins, &resinvrs, &pos,
       );
       print!("\r{} of {} threads done", counter.increment(), pairscnt);
       out
     }).unzip()
};
$ echo 1 1000000000000 | cargo run --release
core threads = 4
generating parameters for P13
each thread segment is [1 x 100352] bytes array
twinprime candidates = 49450550490; resgroups = 33300034
each of 1485 threads has nextp[2 x 78492] array
setup time = 0.005585003 secs
perform twinprimes ssoz sieve
1485 of 1485 threads done
sieve time = 117.669866576 secs
total time = 117.675470472 secs
last segment = 384578 resgroups; segment slices = 42
total twins = 1870585220; last twin = 999999999960+/-1

Huh, would you look at that. Now it's 117.7s, even though my earlier runs seemed to show very little variance. That's what makes these benchmarks so difficult.


(on a related note, I was messing around with your old gist earlier today trying to make it more readable and idiomatic, and I daresay: never before have I seen code that was so strongly susceptible to minute optimization details! I even tried just getting rid of K_min and adding mut to Kmin, and that alone caused a butterfly effect in register assignments and stack locations that appeared to produce a 10% slowdown!! ...though I do suspect that the 1500 competing threads it used to create did not help my benchmarking efforts.)

First, here are my system specs to compare to.

➜  ~ inxi -SCM
System:    Host: localhost.localdomain Kernel: 5.2.6-pclos1 x86_64 bits: 64 Desktop: KDE Plasma 5.16.4 Distro: PCLinuxOS 2019 
Machine:   Type: Laptop System: System76 product: Gazelle v: gaze10 serial: <root required> 
           Mobo: System76 model: Gazelle v: gaze10 serial: <root required> UEFI [Legacy]: American Megatrends v: 1.05.08 
           date: 03/31/2016 
CPU:       Topology: Quad Core model: Intel Core i7-6700HQ bits: 64 type: MT MCP L2 cache: 6144 KiB 
           Speed: 800 MHz min/max: 800/3500 MHz Core speeds (MHz): 1: 1161 2: 1132 3: 969 4: 1131 5: 839 6: 1090 7: 871 
           8: 1146 
➜  ~ rustc --version
rustc 1.36.0 (a53f9df32 2019-07-03)

@ExpHP I ran your new code with your ordered counter and got way different behavior on my system. The first time I ran it the ordered counter was the slowest, so I rebooted and ran it again. Now the unordered counter is the slowest, the ordered counter slightly faster, and no counter fastest. Below are representative times for input of 2 trillioin (to make difference vivid on my system).

   input    no counter   ordered    unordered    Nim
    2e12       110.8       111.6      114.6    ~100.0

Here's the code, comment the appropriate lines for each 3 cases (shown for no counter).

  let (lastwins, cnts): (Vec<_>, Vec<_>) = {
    //let counter = RelaxedCounter::new();
    (0..pairscnt).into_par_iter()
       .map( |index| {
          let out = twins_sieve( index, kmin, kmax, kb, start_num, end_num, modpg,
          &primes.to_vec(), &restwins, &resinvrs, &pos);
          //print!("\r{} of {} threads done", counter.increment(), pairscnt);
          //print!("\r{} of {} threads done", index + 1, pairscnt);
          out
       }).unzip()
  };

The ultimate takeaway for me though is, for this particular case multi-threading in Rust is slower than in Nim|D. Nim|D's ability to spawn threads works better for this use case (does Rust have an equivalent). Even using channels to send results in Nim is faster (though slightly slower than without channels).

I've tested the Rust version(s) over my test values, and as they increase Rust becomes increasingly slower, with all the difference being in sieving/multi-threading (setup times are comparably negligible).

Of course, maybe a Rust wizard|guru could totally restructure the algorithm into magical Rust to do it faster. That would be very interesting to see. :slightly_smiling_face: