I tried a few different ideas: Rust Playground
The code
#[allow(dead_code)]
mod implementations {
#[derive(Debug, Clone, Copy)]
pub struct Event {
pub x: u16,
pub y: u16,
pub time: i64,
pub intens: u16,
}
impl Event {
pub fn new(x: u16, y: u16, time: i64, intens: u16) -> Self {
Self { x, y, time, intens }
}
}
fn count_items(vecs: &[Vec<Event>]) -> usize {
vecs.iter().map(|v| v.len()).sum()
}
// The original one
pub fn merge(hit_vecs: &[Vec<Event>]) -> Vec<Event> {
let total_len: usize = count_items(hit_vecs);
let mut out_vec: Vec<Event> = Vec::with_capacity(total_len);
for vec in hit_vecs {
out_vec.extend_from_slice(vec);
}
out_vec
}
pub fn join(hit_vecs: &[Vec<Event>]) -> Vec<Event> {
hit_vecs.join([].as_slice())
}
pub fn concat(hit_vecs: &[Vec<Event>]) -> Vec<Event> {
hit_vecs.concat()
}
pub fn rayon(hit_vecs: &[Vec<Event>]) -> Vec<Event> {
use rayon::prelude::*;
hit_vecs.par_iter().flatten().copied().collect()
}
// I tested this and it was the same as the borrowed version
pub fn rayon_owned(hit_vecs: Vec<Vec<Event>>) -> Vec<Event> {
use rayon::prelude::*;
hit_vecs.into_par_iter().flatten().collect()
}
fn run_thread(vecs: &[Vec<Event>], mut out: &mut [std::mem::MaybeUninit<Event>]) {
for v in vecs {
let next = out
.split_off_mut(..v.len())
.expect("out slice was too small");
next.write_copy_of_slice(v);
}
assert!(out.is_empty(), "out slice was too large");
}
pub fn manual_thread(mut hit_vecs: &[Vec<Event>]) -> Vec<Event> {
let total_len = count_items(hit_vecs);
let mut out_vec = Vec::with_capacity(total_len);
let cores = std::thread::available_parallelism().unwrap().get();
let vecs_per_core = hit_vecs.len().div_ceil(cores);
let mut out_slice = out_vec.spare_capacity_mut();
std::thread::scope(|scope| {
// run_thread will panic if out_slice is not exactly the right size
for _i in 1..cores {
if hit_vecs.is_empty() {
return;
}
let vecs = hit_vecs.split_off(..vecs_per_core).unwrap();
let next_len = count_items(vecs);
let out = out_slice.split_off_mut(..next_len).unwrap();
scope.spawn(|| run_thread(vecs, out));
}
run_thread(hit_vecs, out_slice);
});
// If we got here, it means all run_thread calls worked and wrote all items
unsafe { out_vec.set_len(total_len) };
out_vec
}
pub fn rayon_spawn(mut hit_vecs: &[Vec<Event>]) -> Vec<Event> {
let total_len = count_items(hit_vecs);
let mut out_vec = Vec::with_capacity(total_len);
let cores = std::thread::available_parallelism().unwrap().get();
let vecs_per_core = hit_vecs.len().div_ceil(cores);
let mut out_slice = out_vec.spare_capacity_mut();
rayon::scope(|scope| {
// run_thread will panic if out_slice is not exactly the right size
for _i in 1..cores {
if hit_vecs.is_empty() {
return;
}
let vecs = hit_vecs.split_off(..vecs_per_core).unwrap();
let next_len = count_items(vecs);
let out = out_slice.split_off_mut(..next_len).unwrap();
scope.spawn(|_| run_thread(vecs, out));
}
run_thread(hit_vecs, out_slice);
});
// If we got here, it means all run_thread calls worked and wrote all items
unsafe { out_vec.set_len(total_len) };
out_vec
}
pub fn rayon_spawn_easy(hit_vecs: &[Vec<Event>]) -> Vec<Event> {
let total_len = count_items(hit_vecs);
let mut out_vec = Vec::with_capacity(total_len);
let mut out_slice = out_vec.spare_capacity_mut();
rayon::scope(|scope| {
for v in hit_vecs {
let next = out_slice.split_off_mut(..v.len()).unwrap();
scope.spawn(|_| {
next.write_copy_of_slice(v);
});
}
});
// If we got here, it means all slice writes worked and wrote all items
unsafe { out_vec.set_len(total_len) };
out_vec
}
}
use implementations::*;
use rand::{Rng, RngExt, SeedableRng};
fn generate(counts: impl Iterator<Item = usize>) -> Vec<Vec<Event>> {
let mut rng = rand::rngs::Xoshiro256PlusPlus::seed_from_u64(139835);
counts
.map(|count| {
std::iter::repeat_with(|| random_event(&mut rng))
.take(count)
.collect()
})
.collect()
}
fn random_event(mut rng: impl Rng) -> Event {
Event::new(rng.random(), rng.random(), rng.random(), rng.random())
}
fn checksum(vec: Vec<Event>) -> u64 {
let mut total: u64 = 0;
for event in vec {
for prop in [
event.x as u64,
event.y as u64,
event.time as u64,
event.intens as u64,
] {
total = total.wrapping_add(prop);
}
}
total
}
fn main() {
rayon::ThreadPoolBuilder::new().build_global().unwrap();
// smaller limit so the playground doesn't OOM
let inner = 16 * 1000 * 1000 / size_of::<Event>();
let outer = 8;
let vecs = generate(std::iter::repeat_n(inner, outer));
let answer = checksum(merge(&vecs));
macro_rules! functions {
($($f:ident),* $(,)?) => {
$({
let t = std::time::Instant::now();
let res = std::hint::black_box($f(std::hint::black_box(&*vecs)));
println!("{:>20}: {:?}", stringify!($f), t.elapsed());
assert_eq!(checksum(res), answer);
})*
};
}
functions! {
merge,
join,
concat,
rayon,
manual_thread,
rayon_spawn,
rayon_spawn_easy,
}
}
Testing them on my computer with the actual numbers, merge, concat, and join are all identical. I'm not seeing the slowness of merge that you reported, which suggests it could be a missed optimization on your target. Rayon's flatten is worse, and the three unsafe threading methods are better and nearly identical. The rayon_spawn* versions gradually pull ahead as the data gets smaller, and at very small sizes I'd assume the single-threaded ones are better, so you may want to pick methods depending on the size. I would also try different thread counts.
merge 375.1 ms
concat 374.2 ms
join 375.5 ms
rayon 823.3 ms
manual_thread 172 ms
rayon_spawn 168.9 ms
rayon_spawn_easy 172.1 ms
I would go with rayon_spawn_easy since it's the simplest of the fast methods. Also make sure the conditions are as they appear:
- Can you make do with
-> impl Iterator<Item = Event>?
- Can you recycle the
Vec from a previous call, allowing you to take an argument of &mut Vec<Event> with the capacity already allocated?
- If there are other things using threads at the same time, perhaps don't parallelize this, or parallelize it less.
- If one of the inner
Vecs already has the capacity, use that instead of allocating a new one.