Multicore dedup Vec<(u64, u64)>?

We have:

t: Vec<(u64, u64)>

t has length 1_000_000_000 elements, and is NOT guaranteed to be in sorted order.

I would like to obtain some collection / iterator that contains all the unique elements of t. A dumb way would be:

t.iter().collect::<HashSet<_>>();

or we could use BTreeSet.

Or we could sort and dedup.

question:

Is there a way to do this on a multi core machine? In particular, on a 72 core 144-thread machine, I'd prefer to get something like a 50x speedup.

pre-emptive

If you have a split-merge solution, please describe it in detail. In all the ones I can think of, the last step is single-thread, and takes two large objects and creates another large object (so by Amdahl's law, limits the parallelism of the entire system).

data distribution

We can not make any assumptions about the distribution of t. Assume it is adversarialy chosen.

First I would measure how fast sort_unstable + dedup is. I suspect it will do quite well. I would even suggest running sort_unstable + dedup and collect::<HashSet<_>>() on two threads in parallel and just wait for the faster one.

However, here's a different idea: Pick a super simple (i.e. very fast) hash function that is randomly seeded and use it to assign each (u64, u64) to a thread. Then have each thread loop over the array and insert every value assigned to itself in a HashMap. The answer is then given the combined values of the hash maps - they are guaranteed to have no overlap.

By randomly seeding the hash function, this will evenly distribute the values between the threads even if they are adversarialy chosen unless the number of distinct elements is really small. However you can use a streaming algorithm (e.g. HyperLogLog) to estimate the number of unique elements and use a much simpler split-merge in that case - the last single threaded step will be extremely fast when the number of unique elements is small.

4 Likes

so that's 16GB. I suspect any algorithm here to be memory-bound and thus would not expect significant speedups from throwing more cores at the problem.


If size of output is expected to be significantly smaller than the input, it could help making such split-merge solution viable, but I guess we cannot make such an assumption.


Or we could sort in parallel! par_sort_unstable. (I guess that still won't give you a 50× speedup (even given ∞ memory bus speed), as sorting will do at least one sequential pass over all data)

5 Likes

How accurate does this need to be? Is it ok, for example, for a small percentage of non-duplicated items to be erroneously dropped?

My instinct here is to use something like a Bloom filter to keep track of the emitted items (though I haven’t worked out exactly how).

On most CPUs a single core isn't able to saturate memory bandwidth. So using multiple cores could help overcoming that, but then you need an approach where there isn't a single-core bottleneck to merge results from threads in the end.
I don't think a concurrent datastructure would help either because those tend to do pointer-chasing or only use a few bytes per cacheline loaded so you're not making full use of the memory bandwidth either.

Well, the problem would be trivial if the duplication factor is large, in that case a unification will have to touch a lot less data.

Is this theoretical task or a practical one?

We have lots of cores, though. And some kind of modern CPU. Means we have AES or maybe even SHA256 in hardware. Means we can split in 1000 (or 10000) buckets more-or-less fairly.

If some buckets are so large that we can not distribute work over 144 threads adequately then we know there are lots of duplicates in these buckets and can rely on that. Otherwise we can just process each bucket individually and merge them in the end (without sorting… or, rather, everything would be sorted on the basis of hash value, not original value).

If that's a theoretical task then we haven't achieved anything because God-like adversary may give us lots of hash collisions.

But in practice, of you use cryptohash it's not a concern.

Don't even dream about that. Task is memory-bound, you need to read and then write numbers at least once. Since core can not saturate memory bus in a modern system, but it can achieve about 10-20% which means no matter what you would do final speedup would be around 5x-10x.

1 Like

You should

  • measure your current implementation
  • determine single-core memory bandwidth of your machine
  • and multi-core memroy bandwidth

And then see how close you're to the lower bound.

If the gap between your current implementation and the single-core lower bound is much larger than the gap between single- and multi-core lower bounds then most of the gains you can make will be by picking a better single-core impl, not by parallelizing your current implementation.

Or we could sort and dedup.

An implementation that rolls the dedup into the sorting could get some constant-factor speedup by avoiding one extra pass over the data.

We can not make any assumptions about the distribution of t. Assume it is adversarialy chosen.

So, possibly very few (but not zero) duplicates which means no gains from size reduction, but any solution relying on in-place dedup (e.g. of a Vec) would have to move all the data around.

2 Likes

After you deal with sorting, Itertools::dedup will give you the unique items without any further data movement. But if you need it more than once, Vec::dedup is probably still worth it.

Here's a kludgy way to do a parallel dedup iterator from a slice with rayon:

pub fn par_dedup<T: Eq + Sync>(slice: &[T]) -> impl ParallelIterator<Item = &T> {
    slice
        .par_windows(2)
        .filter_map(|win| match win {
            [x, y] if x != y => Some(x),
            _ => None,
        })
        .chain(slice.last())
}

Rayon collect isn't great with non-indexed iterators though, so I wouldn't go this way unless you're ultimately aiming for something that can then use an efficient fold+reduce.

Unfortunately, it needs to be exact. Not tracking statistics; think tracking 1 Billion ants in a simulation. We cant have clusters of ants just vanish. :slight_smile:

Thanks everyone for ideas. I think the key insight I did not realize, mentioned by @krdln , @the8472 , and @VorfeedCanal is:

  1. at 16 GB data, this problem is memory bandwidth bound

  2. making up some numbers here: if system bandwidth is 2 GB/s, and single core memory bandwidth is 1 GB/s, even with infinite # of processors, I'm not getting a > 2x performance increases

Thus, measuring memory bandwidth should provide a hard limit on what multicore can buy us.

Not necessarily. Note that merge sort on a single core would pass over the whole thing 30 times. But first 10 would be over L1 cache thus we can ignore them. There would also be few over L2, L3 cache… around 10-15 passes would hit the main RAM and would be slow.

Approach which I discussed needs around 2 passes but requires very careful optimisation to ensure your cores wouldn't turn sequential access into main RAM random read or write (these are so slow you may end up with version which is slower then single-core one!). This requires very careful thinking, but doesn't sound impossible. Add 2-3 speedup from the fact that you are using lots of cores and 5x-10x doesn't sound impossible.

50x speedup, on the other hand, doesn't sound realistic.

Perhaps I misunderstood your suggestion. Let me try again.

  1. We pick some random short string k.
  2. We assume (incorrectly) we can do sha256(k, u64, u64)%100 in a single instruction.

my understanding of the algorighm

  1. We take our input data t, we split it into t_0, t_1, ..., t_100 via the sha % 100 above.

  2. With high probability, we get 100 vecs, each of length 10_000_000.

  3. Then we run sort + dedup on each of the 10_000_000 Vecs in parallel.

analysis

Step (3) can either be (a) single threaded or (b) multi threaded. If we go with (b), looks like lots of concurrency fun. If we go with (a), we make a single pass through the data. This means we can at most do a 20x improvement (since the single threaded merge sort, after discounting L1, makes 20x passes through the data).

It's not obvious to me how to analyze (5). Although we have 100 threads operating in parallel, during the later stages of the merge sort, it seems like they are going to be contending for memory bandwidth.

Is this way instead of #number-of-thread buckets, you were going for {1000, 10000} buckets ? You want each independent task of (5) to fit in L3 cache ?

Adversary is not God-like; it's possibly-coordinating online community that, for entertainment, may try to crash/lag the server.

EDIT: possibly adversarial, possibly coordinating, limited computational power

We don't assume that. Task is memory bound. According to the first post we have 74 cores. Means we need to process around 2GB/s per core. With the use of special instructions it's doable.

It have to be partially multithreaded. Single core is not fast enough to calculate SHA256 fast enough to saturate memory bus. 72 cores should be enough. What would be faster: calculate that single byte on 72 cores and the split data or do splitting on multiple cores? That one needs measurements.

Yes. If you hit main memory during sort them you would create such a crazy pressure on memory then you may, again, go back to slower-than single-threaded algorithm mode.

Then you don't need SHA256, but can use aeshash which Go uses on most modern CPUs. And can safely assume there would be no collisions at all.

In that case it may even be beneficial to create significantly more buckets to ensure that each (without attack) would fit in L2. Since attack can only make them extra-large on the input can not not make them extra-large on the output it should be pretty hard to bring your server down.

I should have phrased this better. What I meant to say is: "for simplicity of analysis, let us make this slightly unrealistic assumption", not "this strategy requires this assumption"

Step 3:

======

It is not obvious to me how to make this step multi threaded. The problem I see here is suppose we have a multi threaded solution, then we have to deal with one of two difficult problems:

Choice 1: Buckets need to be concurrent-safe: because more than one thread might write to it. Whether locking or lock-free, the cost here seems non-trivial.

Choice 2: Each thread keeps it's own local set of buckets, but then afterwards we need to run a step which merges the buckets from the different threads.

To use a hash to split into buckets in parallel with 100 threads, you can do this:

  1. Create a 100x100 matrix of vectors.
  2. Each thread is assigned a row in the matrix and a piece of the original array. It copies numbers from the piece it was assigned into the appropriate vectors in its row.
  3. Then when all have finished, the threads are assigned columns instead. They concatenate all of the vectors in the column and process those numbers.
4 Likes

Assuming x86, this should be row-major to avoid cache line contention, in the first pass right? Or does it not matter here since the items are already 16 bytes wide...

The matrix just contains the vectors themselves so its really small and row vs column doesn't matter in this case.