Parallel product for factorial: surprised by the results

Hi all,

Continuing my exploration of Rust threads from Help for my parallel sum, I tried to build a parallelized factorial of big numbers, using the num::bigint::BigUint type:

use std::clone::Clone;
use std::iter::{Product, Sum};
use std::sync::mpsc;
use std::time::Instant;
use std::env;

extern crate num;
use num::bigint::BigUint;

// function type which will run in each thread
type ChunkTask<'a, T> = fn(&'a [T]) -> T;

//---------------------------------------------------------------------------------------
// trait to call its fn directly from a Vec<T>
//---------------------------------------------------------------------------------------
pub trait ParallelTask<T> {
    // distribute work among threads. As a result, we'll got a Vec<T> which is the result of thread tasks
    fn parallel_task<'a>(&'a self, nb_threads: usize, computation: ChunkTask<'a, T>) -> Vec<T>
    where
        T: 'a + Send + Sync;
}

impl<T> ParallelTask<T> for [T] {
    fn parallel_task<'a>(&'a self, nb_threads: usize, computation: ChunkTask<'a, T>) -> Vec<T>
    where
        T: 'a + Send + Sync,
    {
        // figure out the right size for the number of threads, rounded up
        let chunk_size = (self.len() + nb_threads - 1) / nb_threads;

        // create the channel to be able to receive partial sums from threads
        let (sender, receiver) = mpsc::channel::<T>();

        // create empty vector which will receive all computed valued from children threads
        let mut values: Vec<T> = Vec::new();

        crossbeam::scope(|scope| {
            // create threads: each thread will get the partial sum
            for chunk in self.chunks(chunk_size) {
                // each thread gets its invidual sender
                let thread_sender = sender.clone();

                // spawn thread
                scope.spawn(move |_| {
                    // call dedicated specialized fn
                    let partial_sum: T = computation(chunk);

                    // send it through channel
                    thread_sender.send(partial_sum).unwrap();
                });
            }

            // drop our remaining sender, so the receiver won't wait for it
            drop(sender);

            // sum the results from all threads
            values = receiver.iter().collect();
        })
        .unwrap();

        values
    }
}

//---------------------------------------------------------------------------------------
// Samples of computation functions in each thread
//---------------------------------------------------------------------------------------

// product of elements
fn prod_fn<'a, T: Product<&'a T>>(chunk: &'a [T]) -> T {
    chunk.into_iter().product::<T>()
}

//---------------------------------------------------------------------------------------
// Uses different test scenarii
//---------------------------------------------------------------------------------------
fn main() {
    // manage arguments
    let args: Vec<String> = env::args().collect();
    if args.len() != 3 {
        println!("fact [upper_bound] [nb_thread]");
        return;
    }

    // get arguments
    let upper_bound = match args[1].parse::<u32>() {
        Ok(n) => n,
        Err(e) => panic!("error {} converting {} to an integer !", e, &args[1]),
    };
    let nb_threads = match args[2].parse::<u32>() {
        Ok(n) => n,
        Err(e) => panic!("error {} converting {} to an integer !", e, &args[2]),        
    };

    // fill-in vector
    let v: Vec<BigUint> = (1..=upper_bound).map(|i| BigUint::new(vec![i])).collect();

    // get time for the mono-threaded product
    let mut start = Instant::now();
    let mono_fact = v.iter().product::<BigUint>().to_str_radix(10);
    let duration_mono = start.elapsed();

    // get time for multi-threaded computation
    start = Instant::now();
    let partial_fact = v.parallel_task(nb_threads as usize, prod_fn);
    let multi_fact = partial_fact.iter().product::<BigUint>().to_str_radix(10);
    let duration_multi = start.elapsed();

    // validity check: check if products are equal
    assert_eq!(mono_fact, multi_fact);

    println!("n={},#threads={},mono_threaded={:?},{}_threaded={:?}", upper_bound, nb_threads, duration_mono, nb_threads, duration_multi);

}

Now I tried to run it on an AWS 16 vCPUs instance (c5.4xlarge) with the following CPU features:

Architecture:        x86_64
CPU op-mode(s):      32-bit, 64-bit
Byte Order:          Little Endian
CPU(s):              16
On-line CPU(s) list: 0-15
Thread(s) per core:  2
Core(s) per socket:  8
Socket(s):           1
NUMA node(s):        1
Vendor ID:           GenuineIntel
CPU family:          6
Model:               85
Model name:          Intel(R) Xeon(R) Platinum 8124M CPU @ 3.00GHz
Stepping:            4
CPU MHz:             3406.077
BogoMIPS:            6000.00
Hypervisor vendor:   KVM
Virtualization type: full
L1d cache:           32K
L1i cache:           32K
L2 cache:            1024K
L3 cache:            25344K
NUMA node0 CPU(s):   0-15
Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid aperfmperf tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single pti fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm mpx avx512f avx512dq rdseed adx smap clflushopt clwb avx512cd avx512bw avx512vl xsaveopt xsavec xgetbv1 xsaves ida arat pku ospke

I was very surprised by the figures (compiled with --release):

n=10000,#threads=2,mono_threaded=44.012086ms,2_threaded=32.418851ms
n=10000,#threads=3,mono_threaded=43.968149ms,3_threaded=30.384484ms
n=10000,#threads=4,mono_threaded=43.992862ms,4_threaded=30.209299ms
n=10000,#threads=5,mono_threaded=43.955757ms,5_threaded=30.472512ms
n=10000,#threads=6,mono_threaded=43.982963ms,6_threaded=31.045617ms
n=10000,#threads=7,mono_threaded=43.989174ms,7_threaded=31.62917ms
n=10000,#threads=8,mono_threaded=43.970739ms,8_threaded=32.388249ms
n=10000,#threads=9,mono_threaded=43.961397ms,9_threaded=33.293733ms
n=10000,#threads=10,mono_threaded=43.96888ms,10_threaded=33.96173ms
n=10000,#threads=11,mono_threaded=43.96635ms,11_threaded=35.110248ms
n=10000,#threads=12,mono_threaded=43.967169ms,12_threaded=35.596762ms
n=10000,#threads=13,mono_threaded=44.002396ms,13_threaded=36.439368ms
n=10000,#threads=14,mono_threaded=43.971672ms,14_threaded=36.733153ms
n=10000,#threads=15,mono_threaded=43.995898ms,15_threaded=36.985784ms

n=100000,#threads=2,mono_threaded=6.995495759s,2_threaded=4.920436418s
n=100000,#threads=3,mono_threaded=6.993108889s,3_threaded=4.590584105s
n=100000,#threads=4,mono_threaded=6.997765082s,4_threaded=4.491443494s
n=100000,#threads=5,mono_threaded=6.98674998s,5_threaded=4.454928738s
n=100000,#threads=6,mono_threaded=6.997957363s,6_threaded=4.437379922s
n=100000,#threads=7,mono_threaded=6.989226677s,7_threaded=4.436800568s
n=100000,#threads=8,mono_threaded=7.000204531s,8_threaded=4.442282736s
n=100000,#threads=9,mono_threaded=6.989630252s,9_threaded=4.461835927s
n=100000,#threads=10,mono_threaded=6.998050517s,10_threaded=4.47828486s
n=100000,#threads=11,mono_threaded=6.990986118s,11_threaded=4.493128341s
n=100000,#threads=12,mono_threaded=7.004086972s,12_threaded=4.504849881s
n=100000,#threads=13,mono_threaded=6.9846825s,13_threaded=4.518385463s
n=100000,#threads=14,mono_threaded=6.999432448s,14_threaded=4.533241342s
n=100000,#threads=15,mono_threaded=6.989213543s,15_threaded=4.546461652s

Optimal time is given by 4 threads and adding more of them don't as any real advantage. Is this because of hyperthreading ? What's your opinion ?

Thanks for your comments !

You should use a profiler, but I'd guess the largest cause of work is in your final (serialized) product<BigUint>, as this is the part multiplying the largest factors. If you use Rayon's ParallelIterator::product(), it will be able to do some of these reductions in parallel too, though you'll still have at least one final large multiplication.

Also, the to_str_radix(10) calls are probably taking a lot of time for such a large result. You might want to move that outside of your timed range, since it is the same for the serial and parallel modes. Or don't even bother converting to a string at all, if you don't need it.

More of an aside, BigUint can also multiply with u32 operands directly, so you don't really need to convert to that Vec<BigUint> first. A Vec<u32> will do, or even a Range<u32> for Rayon.

1 Like

@cuviper As always, thanks for your insightful comment. I was converting to a str to be able to compare results, but for sure it's not needed in the main computation task.

I'll try Rayon afterwards. Even because of these flaws, it's weird adding more threads doesn't decrease time. I'm suspecting the CPU architecture which is optimized with 4 real threads, and adding more threads is not really useful. I'll maybe try with another larger AWS instance.

Don't forget Amdahl's law too -- eventually your serial work will dominate over any parallel speedups.

How would adding more threads be helpful if the hardware only has 4 cores (aka, 4 actual concurrent threads of execution)? Asking for more threads is just giving the OS work to do switching between threads amongst the 4 real threads of the hardware. This is only useful for blocking tasks, not computation tasks. Asking for more threads of computation than your hardware actually has in cores simply adds context switching overhead without any possible benefit.

Derp! I see above that you mentioned a 16 core machine. Whoops!

1 Like

@gbutler69 I totally know that adding more threads to a 4-cores CPU will not decrease time. Intel(R) Xeon(R) Platinum 8124M CPU @ 3.00GHz has 16 cores, or 18 depending on the source (https://en.wikichip.org/wiki/intel/xeon_platinum/8124)

Oops! I'm sorry, I missed that you said a 16-core machine. I could've sworn I read you had 4 cores, but, when I re-read, I see that is not the case. Apologies for the non-useful comment. :slight_smile:

1 Like

Never mind, we all make mistakes :wink:

Also, with AWS are you actually getting the full 16 cpus that is claimed or only "up-to" depending on load of AWS? I'd wonder whether AWS is providing a virtual 16 cpus that may or may not be occupied with other work and so asking for more threads isn't necessarily going to get a real hw thread corresponding to one of the cpu cores dedicated.

I don't really know. These are virtual CPUs, because the instance is a virtual machine. Anyway, I'm currently writing the same on C++ with libgmp (because BigInteger are not part of the STL). I'll post the results soon.

Note that num-bigint doesn't claim to be the fastest around, as noted in its alternatives section. It doesn't have anywhere near the engineering time that GMP does. You could use rug for a GMP wrapper in Rust, to keep that part consistent with C++ while you compare parallelism.

Not sure how AWS does their pricing exactly, but dedicated VPS is normally a fair bit more expensive than ordinary VPS. So if you're not paying a lot, you might be on a normal VPS, where you don't get to have 100% CPU time. This negatively impacts accuracy of benchmarks (to what extent, I don't know).

I don't know if that is the case for oddity here, but drawing conclusion from inaccurate benchmarks can be problematic.

EDIT: actually the discrepancy should not be this big even if you're being throttled, I think. So uh nevermind my comment possibly.

I modified a little bit my code, got rid of the to_str_radix(10) and looped for threads:

//---------------------------------------------------------------------------------------
// Uses different test scenarii
//---------------------------------------------------------------------------------------
fn main() {
    // manage arguments
    let args: Vec<String> = env::args().collect();
    if args.len() != 3 {
        println!("fact [upper_bound] [nb_thread]");
        return;
    }

    // get arguments
    let upper_bound = match args[1].parse::<u32>() {
        Ok(n) => n,
        Err(e) => panic!("error {} converting {} to an integer !", e, &args[1]),
    };
    let nb_threads = match args[2].parse::<u32>() {
        Ok(n) => n,
        Err(e) => panic!("error {} converting {} to an integer !", e, &args[2]),
    };

    // fill-in vector
    let v: Vec<BigUint> = (1..=upper_bound).map(|i| BigUint::from(i)).collect();

    // get time for the mono-threaded product
    let mut start = Instant::now();
    let mono_fact = v.iter().product::<BigUint>();
    let duration_mono = start.elapsed();

    // get time for multi-threaded computation
    for num_thread in 2..=nb_threads {
        start = Instant::now();
        let partial_fact = v.parallel_task(num_thread as usize, prod_fn);
        let multi_fact = partial_fact.iter().product::<BigUint>();
        let duration_multi = start.elapsed();

        // validity check: check if products are equal
        assert_eq!(mono_fact, multi_fact);

        println!(
            "n={}, #threads={}, mono_threaded={:?}, {}_threaded={:?}, ratio={:.6}",
            upper_bound,
            num_thread,
            duration_mono,
            num_thread,
            duration_multi,
            duration_multi.as_nanos() as f64 / duration_mono.as_nanos() as f64
        );
    }
}

but now I don't understand the results:

n=5000, #threads=2, mono_threaded=4.404379ms, 2_threaded=2.012359ms, ratio=0.456900
n=5000, #threads=3, mono_threaded=4.404379ms, 3_threaded=1.386292ms, ratio=0.314753
n=5000, #threads=4, mono_threaded=4.404379ms, 4_threaded=1.349952ms, ratio=0.306502
n=5000, #threads=5, mono_threaded=4.404379ms, 5_threaded=1.444267ms, ratio=0.327916
n=5000, #threads=6, mono_threaded=4.404379ms, 6_threaded=1.501525ms, ratio=0.340916
n=5000, #threads=7, mono_threaded=4.404379ms, 7_threaded=1.612581ms, ratio=0.366131
n=5000, #threads=8, mono_threaded=4.404379ms, 8_threaded=1.644026ms, ratio=0.373271
n=5000, #threads=9, mono_threaded=4.404379ms, 9_threaded=1.719807ms, ratio=0.390477
n=5000, #threads=10, mono_threaded=4.404379ms, 10_threaded=1.684499ms, ratio=0.382460
n=5000, #threads=11, mono_threaded=4.404379ms, 11_threaded=1.705943ms, ratio=0.387329
n=5000, #threads=12, mono_threaded=4.404379ms, 12_threaded=1.79041ms, ratio=0.406507
n=5000, #threads=13, mono_threaded=4.404379ms, 13_threaded=1.882652ms, ratio=0.427450
n=5000, #threads=14, mono_threaded=4.404379ms, 14_threaded=1.861951ms, ratio=0.422750
n=5000, #threads=15, mono_threaded=4.404379ms, 15_threaded=1.791403ms, ratio=0.406732
n=5000, #threads=16, mono_threaded=4.404379ms, 16_threaded=1.889838ms, ratio=0.429082
n=10000, #threads=2, mono_threaded=18.133941ms, 2_threaded=6.492539ms, ratio=0.358032
n=10000, #threads=3, mono_threaded=18.133941ms, 3_threaded=3.988311ms, ratio=0.219936
n=10000, #threads=4, mono_threaded=18.133941ms, 4_threaded=3.517385ms, ratio=0.193967
n=10000, #threads=5, mono_threaded=18.133941ms, 5_threaded=3.528933ms, ratio=0.194604
n=10000, #threads=6, mono_threaded=18.133941ms, 6_threaded=3.635387ms, ratio=0.200474
n=10000, #threads=7, mono_threaded=18.133941ms, 7_threaded=3.768576ms, ratio=0.207819
n=10000, #threads=8, mono_threaded=18.133941ms, 8_threaded=4.139041ms, ratio=0.228248
n=10000, #threads=9, mono_threaded=18.133941ms, 9_threaded=4.701418ms, ratio=0.259261
...

and the grah for n=20k,50k,75k,100k,150k,200k is even more puzzling:

Optimal time is around 6-8 threads. The only explanation (apart from an obvious bug I can't spot) I have is some kind of optimization: for 2 threads, it's 1/3 of the mono-threaded time, which is counter intuitive.

Yes, you're right. I have to use GMP to be fair. Anyway, I'm missing some time to do it in C++.

Going higher could well be suffering from hyperthreading effects, as you were first asking about. There's also the fact that you're combining the results from each thread in a serial product, so with more threads there are more of these final multiplications with large operands.

Welcome to super-linear speedups!

I think the likely explanation is that you're not comparing equal calculations. The serial form is just multiplying by the entire range of singe-digit operands, so each multiplication requires a full sweep over the progressively larger accumulator. The parallel form is doing the same over smaller chunks/ranges in each thread, so those separate accumulators don't get as big, and then the final reduction multiplies those intermediate values together. Those final multiplications are more complicated, but you only have a few of them.

You could emulate the exact same chunking in the serial form, if you want to compare that effect alone.

4 Likes