Flattening 8 Ɨ 400MB Vec<T> takes ~3s. Why is this and how do I parallelise it?

I'm merging 8 large vectors (~400MB each, ~3GB total) into one on Mac OS with 100GB/s memory bandwidth, but it takes around 3 seconds regardless of which method I use.

#[derive(Debug, Clone, Copy)]
pub struct Event {
    x: u16,
    y: u16,
    time: i64,
    intens: u16,
}

pub fn merge(hit_vecs: Vec<Vec<Event>>) -> Vec<Event> {
    let total_len: usize = hit_vecs.iter().map(|v| v.len()).sum();
    let mut out_vec: Vec<Event> = Vec::with_capacity(total_len);
    for vec in &hit_vecs {
        out_vec.extend_from_slice(vec);
    }
    out_vec
}

I've also tried flatten().collect() and append() — all take ~3s. Measured with std::time::Instant around the call site.

With 100GB/s bandwidth, copying 6GB (read + write) should take ~60ms.

Why does it take so much longer?
How would I parallelize Vec<Vec< T>> to Vec< T> to decrease run time?
Edit: I compile in release mode.

are you sure you are compiling in release mode? the difference can be rather significant for these scenarios

a likely cause of the slowdown is memory latency rather than throughput especially considering you are likely to have almost constant cache misses. the way to mitigate it is to make so you are always requesting a few cache lines beyond where you are operating, splitting the workload among threads might also help an easy way to do that could be having a Box<[MaybeUninit<Event>]> of lengh= total lenght. you can then get a &mut to it and split it into mutable slices each of the range that is going to host each vec and then give each out to a thread to fill

Yes, I compile in release

what would make you think that ? there is no reason this should get any cache miss at all.
in fact, in optimized mode, this calls memcopy once per vec, which is the most cache friendly operation you could ask for.
@khade12345 can ou check that your asm also produces similar code, with the memcopies

i mean yeah you are using each cache line to the fullest but you still are loading data that is several times bigger than the whole cache so you are going to still have a decent amount of misses since you have essentially 2 sliding windows of cached values at any time

you shouldn't hav any cache misses (prob a couple ones each time you switch to a new vec, but nothing that impacts perfs) because the loads are completely linear and thus predictable, so they should get preloaded

btw @khade12345 there is an std method for this, which is called join. have you tried it ?
how does it compare ?

pub fn merge(hit_vecs: Vec<Vec<Event>>) -> Vec<Event> {
    hit_vecs.join([].as_slice())
}

I have looked at the ASM for the merge function and can see the memcpy line (is this what you meant?):

LBB179_19:
        lsl x2, x24, #4
        add x28, x28, #24
        add x0, x23, x22, lsl #4
        mov x1, x25
        bl _memcpy
        add x22, x22, x24
        str x22, [sp, #24]
        subs x27, x27, #24
        b.ne LBB179_18
        b LBB179_22

I just tried using hit_vecs.join([].as_slice()) and this is much faster ~400ms! Does anyone have an idea why this might be?
Many thanks everyone

yep. if it didn't use memcpy i would have expected it to be much slower, because the copying is what should take 99% of the time.
i have no idea why your implementation ended up so much slower than join though.

the code of join looks the same, if not worse because of the separator

To answer the parallization question. The simplest you can try is using rayon

use rayon::prelude::*;

pub fn merge(hit_vecs: Vec<Vec<Event>>) -> Vec<Event> {
    hit_vecs.into_par_iter().flatten().collect()
}

If you're going to use an empty separator then you might as well use concat instead of join

How long does a second merge into the same target vector take? I'd guess that the expensive part is the OS lazily allocating zeroed pages on first access. If that explanation is correct, a second pass into the same vector would be much faster.

Though that doesn't explain why join might be faster.

I tried a few different ideas: Rust Playground

The code
#[allow(dead_code)]
mod implementations {
    #[derive(Debug, Clone, Copy)]
    pub struct Event {
        pub x: u16,
        pub y: u16,
        pub time: i64,
        pub intens: u16,
    }

    impl Event {
        pub fn new(x: u16, y: u16, time: i64, intens: u16) -> Self {
            Self { x, y, time, intens }
        }
    }

    fn count_items(vecs: &[Vec<Event>]) -> usize {
        vecs.iter().map(|v| v.len()).sum()
    }

    // The original one
    pub fn merge(hit_vecs: &[Vec<Event>]) -> Vec<Event> {
        let total_len: usize = count_items(hit_vecs);
        let mut out_vec: Vec<Event> = Vec::with_capacity(total_len);
        for vec in hit_vecs {
            out_vec.extend_from_slice(vec);
        }
        out_vec
    }

    pub fn join(hit_vecs: &[Vec<Event>]) -> Vec<Event> {
        hit_vecs.join([].as_slice())
    }

    pub fn concat(hit_vecs: &[Vec<Event>]) -> Vec<Event> {
        hit_vecs.concat()
    }

    pub fn rayon(hit_vecs: &[Vec<Event>]) -> Vec<Event> {
        use rayon::prelude::*;
        hit_vecs.par_iter().flatten().copied().collect()
    }

    // I tested this and it was the same as the borrowed version
    pub fn rayon_owned(hit_vecs: Vec<Vec<Event>>) -> Vec<Event> {
        use rayon::prelude::*;
        hit_vecs.into_par_iter().flatten().collect()
    }

    fn run_thread(vecs: &[Vec<Event>], mut out: &mut [std::mem::MaybeUninit<Event>]) {
        for v in vecs {
            let next = out
                .split_off_mut(..v.len())
                .expect("out slice was too small");
            next.write_copy_of_slice(v);
        }
        assert!(out.is_empty(), "out slice was too large");
    }

    pub fn manual_thread(mut hit_vecs: &[Vec<Event>]) -> Vec<Event> {
        let total_len = count_items(hit_vecs);
        let mut out_vec = Vec::with_capacity(total_len);

        let cores = std::thread::available_parallelism().unwrap().get();
        let vecs_per_core = hit_vecs.len().div_ceil(cores);
        let mut out_slice = out_vec.spare_capacity_mut();

        std::thread::scope(|scope| {
            // run_thread will panic if out_slice is not exactly the right size
            for _i in 1..cores {
                if hit_vecs.is_empty() {
                    return;
                }
                let vecs = hit_vecs.split_off(..vecs_per_core).unwrap();
                let next_len = count_items(vecs);
                let out = out_slice.split_off_mut(..next_len).unwrap();
                scope.spawn(|| run_thread(vecs, out));
            }
            run_thread(hit_vecs, out_slice);
        });

        // If we got here, it means all run_thread calls worked and wrote all items
        unsafe { out_vec.set_len(total_len) };
        out_vec
    }

    pub fn rayon_spawn(mut hit_vecs: &[Vec<Event>]) -> Vec<Event> {
        let total_len = count_items(hit_vecs);
        let mut out_vec = Vec::with_capacity(total_len);

        let cores = std::thread::available_parallelism().unwrap().get();
        let vecs_per_core = hit_vecs.len().div_ceil(cores);
        let mut out_slice = out_vec.spare_capacity_mut();

        rayon::scope(|scope| {
            // run_thread will panic if out_slice is not exactly the right size
            for _i in 1..cores {
                if hit_vecs.is_empty() {
                    return;
                }
                let vecs = hit_vecs.split_off(..vecs_per_core).unwrap();
                let next_len = count_items(vecs);
                let out = out_slice.split_off_mut(..next_len).unwrap();
                scope.spawn(|_| run_thread(vecs, out));
            }
            run_thread(hit_vecs, out_slice);
        });

        // If we got here, it means all run_thread calls worked and wrote all items
        unsafe { out_vec.set_len(total_len) };
        out_vec
    }

    pub fn rayon_spawn_easy(hit_vecs: &[Vec<Event>]) -> Vec<Event> {
        let total_len = count_items(hit_vecs);
        let mut out_vec = Vec::with_capacity(total_len);
        let mut out_slice = out_vec.spare_capacity_mut();

        rayon::scope(|scope| {
            for v in hit_vecs {
                let next = out_slice.split_off_mut(..v.len()).unwrap();
                scope.spawn(|_| {
                    next.write_copy_of_slice(v);
                });
            }
        });

        // If we got here, it means all slice writes worked and wrote all items
        unsafe { out_vec.set_len(total_len) };
        out_vec
    }
}

use implementations::*;
use rand::{Rng, RngExt, SeedableRng};

fn generate(counts: impl Iterator<Item = usize>) -> Vec<Vec<Event>> {
    let mut rng = rand::rngs::Xoshiro256PlusPlus::seed_from_u64(139835);
    counts
        .map(|count| {
            std::iter::repeat_with(|| random_event(&mut rng))
                .take(count)
                .collect()
        })
        .collect()
}

fn random_event(mut rng: impl Rng) -> Event {
    Event::new(rng.random(), rng.random(), rng.random(), rng.random())
}

fn checksum(vec: Vec<Event>) -> u64 {
    let mut total: u64 = 0;
    for event in vec {
        for prop in [
            event.x as u64,
            event.y as u64,
            event.time as u64,
            event.intens as u64,
        ] {
            total = total.wrapping_add(prop);
        }
    }
    total
}

fn main() {
    rayon::ThreadPoolBuilder::new().build_global().unwrap();

    // smaller limit so the playground doesn't OOM
    let inner = 16 * 1000 * 1000 / size_of::<Event>();
    let outer = 8;
    let vecs = generate(std::iter::repeat_n(inner, outer));
    let answer = checksum(merge(&vecs));

    macro_rules! functions {
        ($($f:ident),* $(,)?) => {
            $({
                let t = std::time::Instant::now();
                let res = std::hint::black_box($f(std::hint::black_box(&*vecs)));
                println!("{:>20}: {:?}", stringify!($f), t.elapsed());
                assert_eq!(checksum(res), answer);
            })*
        };
    }

    functions! {
        merge,
        join,
        concat,
        rayon,
        manual_thread,
        rayon_spawn,
        rayon_spawn_easy,
    }
}

Testing them on my computer[1] with the actual numbers, merge, concat, and join are all identical. I'm not seeing the slowness of merge that you reported, which suggests it could be a missed optimization on your target. Rayon's flatten is worse, and the three unsafe threading methods are better and nearly identical. The rayon_spawn* versions gradually pull ahead as the data gets smaller, and at very small sizes I'd assume the single-threaded ones are better, so you may want to pick methods depending on the size. I would also try different thread counts.

merge             375.1 ms
concat            374.2 ms
join              375.5 ms
rayon             823.3 ms
manual_thread     172 ms
rayon_spawn       168.9 ms
rayon_spawn_easy  172.1 ms

I would go with rayon_spawn_easy since it's the simplest of the fast methods. Also make sure the conditions are as they appear:

  • Can you make do with -> impl Iterator<Item = Event>?
  • Can you recycle the Vec from a previous call, allowing you to take an argument of &mut Vec<Event> with the capacity already allocated?
  • If there are other things using threads at the same time, perhaps don't parallelize this, or parallelize it less.
  • If one of the inner Vecs already has the capacity, use that instead of allocating a new one.

  1. 12 available threads, so in this case each of the 8 Vecs gets its own thread ā†©ļøŽ

Your rayon_spawn_easy function is also faster than join on my setup. Recycling the Vec is also a good idea although it didn't provide much speed up in my case.

I don't think I can use impl Iterator<Item = Event> since I sort the entire array wrt the time component later.
The Rayon par iter function is also slower on my setup, maybe because it iterates through all values instead of using memcpy?

it is much more likely that it is because it doesn't preallocates. it is unlikely that the optimizer should miss the memcopy, but rayon does not hae any tool to compute the total lenght from the start, so it is likely forced to reallocate frequently, which could cause a new move of all the items that have already been moved each time

loop 0
               merge: 9.680603ms
                join: 10.617245ms
              concat: 11.253961ms
               rayon: 33.050206ms
       manual_thread: 7.266874ms
         rayon_spawn: 7.584562ms
    rayon_spawn_easy: 6.26464ms
loop 1
               merge: 9.421817ms
                join: 10.622293ms
              concat: 10.508339ms
               rayon: 29.282741ms
       manual_thread: 6.634194ms
         rayon_spawn: 7.215999ms
    rayon_spawn_easy: 6.901147ms
loop 2
               merge: 11.016946ms
                join: 10.040801ms
              concat: 11.750054ms
               rayon: 26.662975ms
       manual_thread: 7.037183ms
         rayon_spawn: 6.981498ms
    rayon_spawn_easy: 6.904222ms
loop 3
               merge: 10.074704ms
                join: 10.671055ms
              concat: 10.803384ms
               rayon: 26.203912ms
       manual_thread: 6.233571ms
         rayon_spawn: 7.766223ms
    rayon_spawn_easy: 7.664072ms
loop 4
               merge: 9.809966ms
                join: 9.894315ms
              concat: 10.055077ms
               rayon: 26.423476ms
       manual_thread: 6.186804ms
         rayon_spawn: 8.563743ms
    rayon_spawn_easy: 7.286161ms
loop 5
               merge: 9.762918ms
                join: 9.441153ms
              concat: 8.935302ms
               rayon: 32.231146ms
       manual_thread: 6.508638ms
         rayon_spawn: 6.750644ms
    rayon_spawn_easy: 6.370108ms
loop 6
               merge: 9.750955ms
                join: 8.476439ms
              concat: 7.478653ms
               rayon: 25.905932ms
       manual_thread: 8.043635ms
         rayon_spawn: 6.894874ms
    rayon_spawn_easy: 6.136018ms
loop 7
               merge: 9.531443ms
                join: 8.715037ms
              concat: 11.535491ms
               rayon: 22.751821ms
       manual_thread: 6.513818ms
         rayon_spawn: 7.607575ms
    rayon_spawn_easy: 6.181022ms
loop 8
               merge: 7.274829ms
                join: 7.215798ms
              concat: 7.14221ms
               rayon: 23.279814ms
       manual_thread: 6.700259ms
         rayon_spawn: 6.671084ms
    rayon_spawn_easy: 6.23313ms
loop 9
               merge: 9.310657ms
                join: 9.697395ms
              concat: 9.313904ms
               rayon: 26.931611ms
       manual_thread: 6.111542ms
         rayon_spawn: 8.9184ms
    rayon_spawn_easy: 6.699998ms

Things are different.

Firstly, the first loop might over-estimate the elapsed time.

Secondly, for some really good CPU (e.g., AMD 9955H3D), time consumption are much less than expected.

Right, this is not an IndexedParallelIterator after flattening, so it does not get direct parallel collection into the final Vec like you might hope. In fact, it collects a LinkedList<Vec<_>> first, and then serially moves that to the final result, undermining what you were trying to parallelize in the first place.

It's even worse actually, because rayon's flatten also uses parallel job splitting on that inner part, so that collected list of vectors is going to have even more separate chunks than you started with. You can do a little better with that using flatten_iter, so the inner part is kept serial -- but this will still be extra data copying compared to a plain serial join or concat.

The rayon_spawn* implementations are better, though I acknowledge that it's rough having to do it more manually like this.

Sorting it is actually really important. Are all the small vecs already sorted? If not, sorting them in parallel is an option. Then merging them (you did call the function merge which makes sense now) in sorted order might be faster than copying them and then sorting. Or if not, making a custom sort that knows there's 8 sorted segments would be faster than other sort methods.

I've been working on and off on a tool called lst that is ls with some additional features and higher performance. I had the same need: collect all filesystem items (with stat info) via rayon tasks into a Vec that I then can par_sort and print.

To lower the Vec generation cost I built a Bag datastructure, a tree with the leaves holding sub-Vecs with the items of a single directory == single rayon task. The Bag is then flattened at the end, also in parallel (but not done well yet, there's room for improvement). This successfully lowered the cost.

A few additional observations I made:

  • The sorting cost is lower (making the total cost lower) when I move all items from sub-Vecs together rather than make a new Vec of references into the Bag items; probably because the indirection increases the memory traffic / latency during sorting more than what's saved first.

  • I also had the idea to sort all the leaf vectors in the subtasks, then doing the merges to get the flat Vec at the end, but it was slower; I suspect because the directories can be of very different sizes, thus merges ends up copying/moving items multiple times without much productive work (i.e. when merging a Vec of 10 items with a Vec of 1000 items, 100 items are just copied before 1 item from the other comes in, in average). Whereas par_iter can work with same-sized slices and thus be more 'productive'. Although I only used a merge function that merged 2 sequences, maybe an n-ary merge would work better (but I'm not sure how you would parallelize that).

Have you considered special data structures (trie, probably on Unicode-casefolded filenames)?

To @khade12345, I briefly considered counting sort (split the events into buckets by 16 or so highest-order bits, sort each bucket, merge) but I don't know if top bits of time even differ in your usecase.

The range of times event.time is much larger than the number of elements in Vec<Event> so I'm guessing this won't be more efficient.