How to insert into Hashmap from multiple threads at the same time?

I have a bunch of duplicate data in vectors. I want to remove duplicates by inserting them into a hashmap. However, inserting one value at a time is very slow if you have billions of elements. How do you insert values into a hashmap from multiple threads at the same time? Ignoring overwriting, data races, and atomics.


use std::collections::HashMap;


fn main() {

//Duplicate data
let key_vec =   vec![   5,    2,    6,    2,     5,     1];
let value_vec = vec![-0.1,  0.2,  0.0,  0.2,  -0.1,  -0.3];

let mut map = HashMap::new();

//How to parallize insert from multiple threads?
for i in 0..key_vec.len()
{
    map.insert(key_vec[i].clone(), value_vec[i].clone());
}


}
1 Like

To answer the question of inserting multiple items into a hashmap at the same time from multiple threads, you have two options.

First, you could wrap the HashMap in a Mutex and let each thread lock the hashmap whenever it wants to add an item.

The other solution is to switch to a concurrency-aware hash map implementation like DashMap which handles the synchronisation in a much smarter way.

However, if you are just trying to populate a hash map as fast as possible, trying to do populate the same map from multiple threads will actually make your code slower. The reason for this is you need to make sure that concurrent inserts don't accidentally mess up the hash map's internals. I'm not just talking about accidentally losing a value here or there, these sorts of "data races" open you up to accidentally messing up internal pointers and crashing your entire program. Even if those crashes are very infrequent, trying to populate a hash map with a lot of values from a lot of threads in a tight loop is the most adversarial thing you can do and will almost certainly trigger the crashes.


A better solution is to look at the problem statement, "I want to deduplicate a large number of items really quickly by saving them to a hash map", and try to break that down.

First, is there a way to avoid the need for deduplication in the first place? You might be able to create the data in a way that naturally avoids duplicates. Alternatively, you could try to make sure whatever is consuming your data is able to cope with duplicate values. That lets us side-step the need for deduplication, effectively giving you a speed improvement of ∞%.

If we can't avoid the situation, we'll need to tackle it head-on.

One way to do this is to take a divide-and-conquer approach - split the inputs into chunks and deduplicate each chunk in its own thread, then merge all the deduplicated chunks into one big HashMap. Depending on the amount of data we are talking about, this may work for you. You may also find that the cost of running things concurrently (synchronisation between threads, creating lots of intermediate HashMaps and merging them, etc.) makes your code a lot slower. As usual, you'll want to measure first.

As an unscientific example, here is some code for deduplicating in parallel using the rayon crate (the gold standard for doing parallel operations in Rust).

use std::collections::HashMap;
use rayon::prelude::*;

fn deduplicate_sequential(keys: &[u32], values: &[f64]) -> HashMap<u32, f64> {
    assert_eq!(keys.len(), values.len());

    keys.iter().copied().zip(values.iter().copied()).collect()
}

fn deduplicate_in_parallel(keys: &[u32], values: &[f64]) -> HashMap<u32, f64> {
    assert_eq!(keys.len(), values.len());

    // Get a parallel iterator of (u32, f64) pairs
    let pairs = keys.par_iter().copied().zip(values.par_iter().copied());

    let deduplicated_chunks = pairs.fold(HashMap::new, |mut map, (key, value)| {
        map.insert(key, value);
        map
    });

    let merged = deduplicated_chunks.reduce(HashMap::new, |mut left_map, right_map| {
        left_map.extend(right_map);
        left_map
    });

    merged
}

(playground)

Running it in the playground with 1 million elements and compiled in debug mode gives me this:

Deduplicating 999896 items sequentially took 591.367755ms
Deduplicating 999896 items in parallel took 3.632163332s

In release mode:

Deduplicating 999882 items sequentially took 60.152989ms
Deduplicating 999882 items in parallel took 279.821825ms

Running the same code on a recent M1 MacBook shows about a 3x speed-up, but deduplicating sequentially is still considerably faster.

15 Likes

@Michael-F-Bryan
Thanks for the detailed reply. I guess multithreaded inserts are slower than single threaded inserts into hashmaps due to not being able to write to a memory location at the same time.

I am looking into other implementations of hashmaps. I don't care about hash security or data order because I only want to deduplicate keys in the hashmap. Is there a crate similar to nohash-hasher for fast hashes? It seems to be abandoned.

Are you actually looking into other implementations of hashmaps or just hashers?

What kind of maintenance do you expect for a crate like that? It does something extremely simple. The actual code is like ~150 lines and it's mostly boilerplate.

1 Like

Assuming you don't actually want a HashMap, but only want to use it to drop duplicates, this might be a starting point.

I was actually able to get a tiny speedup here over the baseline by splitting the data into buckets, such that duplicate values are guaranteed to fall in the same bucket.

There are lots of ways you could do better, especially if you are willing to sacrifice precision! :slight_smile:

...Actually, I found that if you increase the workload until it takes a full 1 second to run the sequential version, the parallel version runs in 300ms — 3x faster! On my machine, anyway.

@SkiFire13

I am looking for hashmaps that support multithreading (splitting and joining threads), AVX optimization (certain instructions can write to multiple memory locations at the same time), and support identity hash function.

@jorendorff
I am not familiar with Rust rayon. Do the threads join one by one O(n-1) or join in parallel O(log2(n)) for the bucketing?

Rayon will be logarithmic.

Overall, the rayon approach heavily depends on how much deduplication there will actually be. If a lot of items survive, then you spend a lot more time merging and reallocating in the reduce step. We have some benchmarks in the rayon repo of different collect strategies we tried:

test map_collect::i_mod_10_to_i::with_collect                           ... bench:   2,158,359 ns/iter (+/- 64,941)
test map_collect::i_mod_10_to_i::with_fold                              ... bench:     456,081 ns/iter (+/- 16,461)
test map_collect::i_mod_10_to_i::with_fold_vec                          ... bench:     575,579 ns/iter (+/- 29,454)
test map_collect::i_mod_10_to_i::with_linked_list_collect               ... bench:   4,327,805 ns/iter (+/- 261,511)
test map_collect::i_mod_10_to_i::with_linked_list_collect_vec           ... bench:   2,350,412 ns/iter (+/- 101,515)
test map_collect::i_mod_10_to_i::with_linked_list_collect_vec_sized     ... bench:   2,413,770 ns/iter (+/- 119,020)
test map_collect::i_mod_10_to_i::with_linked_list_map_reduce_vec_sized  ... bench:   2,382,592 ns/iter (+/- 83,305)
test map_collect::i_mod_10_to_i::with_mutex                             ... bench:  28,778,848 ns/iter (+/- 2,216,573)
test map_collect::i_mod_10_to_i::with_mutex_vec                         ... bench:   6,243,400 ns/iter (+/- 1,033,037)
test map_collect::i_mod_10_to_i::with_vec_vec_sized                     ... bench:   2,442,997 ns/iter (+/- 149,437)
test map_collect::i_to_i::with_collect                                  ... bench:   3,875,987 ns/iter (+/- 127,681)
test map_collect::i_to_i::with_fold                                     ... bench:  12,839,940 ns/iter (+/- 1,007,096)
test map_collect::i_to_i::with_fold_vec                                 ... bench:  12,852,246 ns/iter (+/- 1,583,942)
test map_collect::i_to_i::with_linked_list_collect                      ... bench:   7,552,594 ns/iter (+/- 595,211)
test map_collect::i_to_i::with_linked_list_collect_vec                  ... bench:   7,704,584 ns/iter (+/- 368,471)
test map_collect::i_to_i::with_linked_list_collect_vec_sized            ... bench:   4,332,625 ns/iter (+/- 274,807)
test map_collect::i_to_i::with_linked_list_map_reduce_vec_sized         ... bench:   4,344,150 ns/iter (+/- 191,931)
test map_collect::i_to_i::with_mutex                                    ... bench:  45,221,873 ns/iter (+/- 365,509)
test map_collect::i_to_i::with_mutex_vec                                ... bench:  19,704,084 ns/iter (+/- 2,728,435)
test map_collect::i_to_i::with_vec_vec_sized                            ... bench:   4,373,531 ns/iter (+/- 270,386)

The i_mod_10 case has keys generated with i % 10, so extreme duplication, and that's where fold wins. But for the general FromParallelIterator for HashMap<K, V>, we used the implementation that collects LinkedList<Vec<(K, V)>> and moves that into a HashMap afterward, sequentially.

If you're not producing these items with parallel computation, it probably doesn't make much sense to involve rayon at all.

1 Like

@supermagiclol You're advanced enough that I'm definitely not going to be able to answer all your questions about Rayon. Play with it, read the book, look at the source. It's great software.

Something I might want to try is having an initial hash table that's made of atomic ints, falling back on a Mutex<HashSet<Key>>, or possibly an array of them to reduce contention further.

The nice thing about Rust is it is so easy to experiment. There are 30 different 10-line solutions to this problem, and it's easy to write them and actually try them out. Look:

fn deduplicate_in_parallel(data: &mut Vec<(u32, f64)>) {
    data.par_sort_by_key(|pair| pair.0);
    data.dedup_by_key(|pair| pair.0);
}

Turns out this is competitive, even though the second half is single-threaded.

@jorendorff
@cuviper
Thanks everyone for their help.
I am going to try using rayon parallel iterators for splitting and collecting into a linkedlist and then converting into a hashmap

In case it wasn't clear, rayon's collect() will do the LinkedList intermediate for you -- or whatever future scheme we may find better!

2 Likes

You must use Arc for linking expressions between threads, and Mutex for safe mutable data by locking it to change only from one thread in same time.

use std::{
    collections::HashMap,
    mem::drop,
    sync::{Arc, Mutex},
    thread::{sleep, spawn},
    time::Duration,
};

fn main() {
    // Duplicate data
    let key_vec = Arc::new(vec![5, 2, 6, 2, 5, 1]);
    let value_vec = Arc::new(vec![-0.1, 0.2, 0.0, 0.2, -0.1, -0.3]);

    // Atomic reference counter for mutex of HashMap
    // If you need sort map by key that use BTreeMap instead of HashMap
    let map = Arc::new(Mutex::new(HashMap::new()));

    // Create two threads
    for th in 0..2 {
        // Get references for every thread
        let key_vec = key_vec.clone();
        let value_vec = value_vec.clone();
        let map = map.clone();

        spawn(move || {
            for i in 0..key_vec.len() {
                if th == 0 && i % 2 == 0 {
                    // Lock map for other threads
                    let mut map = map.lock().unwrap();
                    map.insert(key_vec[i].clone(), value_vec[i].clone());
                    // Manually unlock mutex if needed
                    drop(map);
                } else if th == 1 && i % 2 != 0 {
                    let mut map = map.lock().unwrap();
                    map.insert(key_vec[i].clone(), value_vec[i].clone());
                    // drop(map);
                }
            }
        });
    }

    // Waiting result in main thread (must use std::mpsc::channel calls instead of sleep)
    sleep(Duration::from_secs(1));

    // Result
    println!("map: {:?}", &map);
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.