Rayon par_iter() is always slower than iter()

Hi,

I'm trying to parse a vector into a HashMap. I get pretty good performance using iter() and wanted to add data parallelism. Rayon and par_iter() seemed the best way to do this with minimal new code.

However, no matter how large the vector I want to parse my par_iter() is always slower (specifically p999 latency).

Am I using par_iter(), map and reduce correctly? Or am I stumbling into a particular use case for which Rayon doesn't work well?

I've added a reproduce-able example below to demonstrate my issue. Any help would be hugely appreciated!!

use anyhow::Result;
use std::time::Instant;
use std::collections::HashMap;
use rayon::prelude::*;


fn main() -> Result<()>{
    // vector to hold all the recorded times
    let mut par_times = vec![];
    let mut seq_times = vec![];

    for _sl in  0..10 {
        // par_iter version
        let mut some_vec = vec![];
        for _i in 0..10_000_000{
            some_vec.push(1);
        }
        let start = Instant::now();
        let result: HashMap<&i32, i32> = some_vec
            .par_iter()
            .map(|s| {
                let mut m = HashMap::new();
                m.insert(s, s * 2);
                m
            })
            .reduce(
                || HashMap::new(),
                |m1, m2| {
                    m2.iter().fold(m1, |mut acc, (k, vs)| {
                        acc.entry(k.clone()).or_insert(*vs);
                        acc
                    })
                },
            );

        par_times.push(start.elapsed().as_micros());

        // iter version

        let mut some_vec = vec![];
        for _i in 0..10_000_000{
            some_vec.push(1);
        }

        let start = Instant::now();

        let mut result: HashMap<&i32, i32> = HashMap::new();

        some_vec.iter()
            .for_each(|s|
                {
                    result.insert(s, s* 2);
                }
            );

        seq_times.push(start.elapsed().as_micros());

    }
    
    /// function to calculate pXXX latencies 
    fn get_latency(mut vector_of_times: Vec<u128>){
        vector_of_times.sort();
        let vot = vector_of_times.len();
        let percentile_index_999 = (vot as f64 * (99.9 / 100.0)).floor() as usize;
        let percentile_index_50 = (vot as f64 * (50.0 / 100.0)).floor() as usize;
        let percentile_index_25 = (vot as f64 * (25.0 / 100.0)).floor() as usize;
        println!("p999 latency: {} us", vector_of_times[percentile_index_999]);
        println!("p50 latency: {} us", vector_of_times[percentile_index_50]);
        println!("p25 latency: {} us", vector_of_times[percentile_index_25]);

    }

    get_latency(par_times);
    get_latency(seq_times);

    Ok(())
}

You are simply not going to beat the single-threaded performance for a task like this. Hash maps do not really have a structure where multi-threading can help you insert a lot of items quickly.

Also, you should prefer to use &i32 as the key type for your map.

ahh I see, so effectively the threshold for justifying data parallelism here is that the operation is sufficiently expensive?

Yep. Otherwise overheads like transferring data to each thread and collecting them at the end will be more expensive than just doing the single-threaded operation.

great, that makes sense!

Thanks both!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.