Hi all,
Continuing my exploration of Rust threads from Help for my parallel sum, I tried to build a parallelized factorial of big numbers, using the num::bigint::BigUint type:
use std::clone::Clone;
use std::iter::{Product, Sum};
use std::sync::mpsc;
use std::time::Instant;
use std::env;
extern crate num;
use num::bigint::BigUint;
// function type which will run in each thread
type ChunkTask<'a, T> = fn(&'a [T]) -> T;
//---------------------------------------------------------------------------------------
// trait to call its fn directly from a Vec<T>
//---------------------------------------------------------------------------------------
pub trait ParallelTask<T> {
// distribute work among threads. As a result, we'll got a Vec<T> which is the result of thread tasks
fn parallel_task<'a>(&'a self, nb_threads: usize, computation: ChunkTask<'a, T>) -> Vec<T>
where
T: 'a + Send + Sync;
}
impl<T> ParallelTask<T> for [T] {
fn parallel_task<'a>(&'a self, nb_threads: usize, computation: ChunkTask<'a, T>) -> Vec<T>
where
T: 'a + Send + Sync,
{
// figure out the right size for the number of threads, rounded up
let chunk_size = (self.len() + nb_threads - 1) / nb_threads;
// create the channel to be able to receive partial sums from threads
let (sender, receiver) = mpsc::channel::<T>();
// create empty vector which will receive all computed valued from children threads
let mut values: Vec<T> = Vec::new();
crossbeam::scope(|scope| {
// create threads: each thread will get the partial sum
for chunk in self.chunks(chunk_size) {
// each thread gets its invidual sender
let thread_sender = sender.clone();
// spawn thread
scope.spawn(move |_| {
// call dedicated specialized fn
let partial_sum: T = computation(chunk);
// send it through channel
thread_sender.send(partial_sum).unwrap();
});
}
// drop our remaining sender, so the receiver won't wait for it
drop(sender);
// sum the results from all threads
values = receiver.iter().collect();
})
.unwrap();
values
}
}
//---------------------------------------------------------------------------------------
// Samples of computation functions in each thread
//---------------------------------------------------------------------------------------
// product of elements
fn prod_fn<'a, T: Product<&'a T>>(chunk: &'a [T]) -> T {
chunk.into_iter().product::<T>()
}
//---------------------------------------------------------------------------------------
// Uses different test scenarii
//---------------------------------------------------------------------------------------
fn main() {
// manage arguments
let args: Vec<String> = env::args().collect();
if args.len() != 3 {
println!("fact [upper_bound] [nb_thread]");
return;
}
// get arguments
let upper_bound = match args[1].parse::<u32>() {
Ok(n) => n,
Err(e) => panic!("error {} converting {} to an integer !", e, &args[1]),
};
let nb_threads = match args[2].parse::<u32>() {
Ok(n) => n,
Err(e) => panic!("error {} converting {} to an integer !", e, &args[2]),
};
// fill-in vector
let v: Vec<BigUint> = (1..=upper_bound).map(|i| BigUint::new(vec![i])).collect();
// get time for the mono-threaded product
let mut start = Instant::now();
let mono_fact = v.iter().product::<BigUint>().to_str_radix(10);
let duration_mono = start.elapsed();
// get time for multi-threaded computation
start = Instant::now();
let partial_fact = v.parallel_task(nb_threads as usize, prod_fn);
let multi_fact = partial_fact.iter().product::<BigUint>().to_str_radix(10);
let duration_multi = start.elapsed();
// validity check: check if products are equal
assert_eq!(mono_fact, multi_fact);
println!("n={},#threads={},mono_threaded={:?},{}_threaded={:?}", upper_bound, nb_threads, duration_mono, nb_threads, duration_multi);
}
Now I tried to run it on an AWS 16 vCPUs instance (c5.4xlarge) with the following CPU features:
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
Thread(s) per core: 2
Core(s) per socket: 8
Socket(s): 1
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 85
Model name: Intel(R) Xeon(R) Platinum 8124M CPU @ 3.00GHz
Stepping: 4
CPU MHz: 3406.077
BogoMIPS: 6000.00
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 1024K
L3 cache: 25344K
NUMA node0 CPU(s): 0-15
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good nopl xtopology nonstop_tsc cpuid aperfmperf tsc_known_freq pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single pti fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm mpx avx512f avx512dq rdseed adx smap clflushopt clwb avx512cd avx512bw avx512vl xsaveopt xsavec xgetbv1 xsaves ida arat pku ospke
I was very surprised by the figures (compiled with --release):
n=10000,#threads=2,mono_threaded=44.012086ms,2_threaded=32.418851ms
n=10000,#threads=3,mono_threaded=43.968149ms,3_threaded=30.384484ms
n=10000,#threads=4,mono_threaded=43.992862ms,4_threaded=30.209299ms
n=10000,#threads=5,mono_threaded=43.955757ms,5_threaded=30.472512ms
n=10000,#threads=6,mono_threaded=43.982963ms,6_threaded=31.045617ms
n=10000,#threads=7,mono_threaded=43.989174ms,7_threaded=31.62917ms
n=10000,#threads=8,mono_threaded=43.970739ms,8_threaded=32.388249ms
n=10000,#threads=9,mono_threaded=43.961397ms,9_threaded=33.293733ms
n=10000,#threads=10,mono_threaded=43.96888ms,10_threaded=33.96173ms
n=10000,#threads=11,mono_threaded=43.96635ms,11_threaded=35.110248ms
n=10000,#threads=12,mono_threaded=43.967169ms,12_threaded=35.596762ms
n=10000,#threads=13,mono_threaded=44.002396ms,13_threaded=36.439368ms
n=10000,#threads=14,mono_threaded=43.971672ms,14_threaded=36.733153ms
n=10000,#threads=15,mono_threaded=43.995898ms,15_threaded=36.985784ms
n=100000,#threads=2,mono_threaded=6.995495759s,2_threaded=4.920436418s
n=100000,#threads=3,mono_threaded=6.993108889s,3_threaded=4.590584105s
n=100000,#threads=4,mono_threaded=6.997765082s,4_threaded=4.491443494s
n=100000,#threads=5,mono_threaded=6.98674998s,5_threaded=4.454928738s
n=100000,#threads=6,mono_threaded=6.997957363s,6_threaded=4.437379922s
n=100000,#threads=7,mono_threaded=6.989226677s,7_threaded=4.436800568s
n=100000,#threads=8,mono_threaded=7.000204531s,8_threaded=4.442282736s
n=100000,#threads=9,mono_threaded=6.989630252s,9_threaded=4.461835927s
n=100000,#threads=10,mono_threaded=6.998050517s,10_threaded=4.47828486s
n=100000,#threads=11,mono_threaded=6.990986118s,11_threaded=4.493128341s
n=100000,#threads=12,mono_threaded=7.004086972s,12_threaded=4.504849881s
n=100000,#threads=13,mono_threaded=6.9846825s,13_threaded=4.518385463s
n=100000,#threads=14,mono_threaded=6.999432448s,14_threaded=4.533241342s
n=100000,#threads=15,mono_threaded=6.989213543s,15_threaded=4.546461652s
Optimal time is given by 4 threads and adding more of them don't as any real advantage. Is this because of hyperthreading ? What's your opinion ?
Thanks for your comments !