Hello! I am currently hitting some performance limitations of parallelization and am seeking advice to improve parallel performance.
My use case is:
I iterate over large sequences (>1M elements, total size > 1GB) and currently parallelize this with Rayon.
The sequence comes from a (potentially infinite) Iterator
that is produced by a parser, so I use par_bridge
to turn it into a parallel iterator. The total thing looks like: seq.par_bridge().try_for_each(handle)
.
I noted that parallel iteration incurs a significant overhead; this seems to come from the total size of the data (and not the number of elements). For this, I created a small test which creates some large data and treats it in parallel (Rust Playground):
#[derive(Clone)]
struct Tree(Vec<Tree>);
impl Tree {
fn new(w: usize, h: usize) -> Self {
if h == 0 {
Self(Vec::new())
} else {
Self(vec![Tree::new(w, h - 1); w])
}
}
}
fn main() {
use rayon::prelude::*;
rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build_global()
.unwrap();
let trees = (0..25).into_iter().map(|i| (i, Tree::new(2, i)));
//let trees = trees.par_bridge(); // <-- uncomment this for benchmark #1
//let trees = trees.collect::<Vec<_>>().into_par_iter(); // <-- uncomment this for benchmark #2
trees.for_each(|(i, _tree)| println!("i: {}", i)); // <-- comment this for benchmark #3
//chan_test(trees) // <-- uncomment this for benchmark #3
}
fn chan_test(iter: impl Iterator<Item = (usize, Tree)>) {
let (s, r) = crossbeam_channel::bounded(2);
crossbeam_utils::thread::scope(|scope| {
scope.spawn(|_| r.iter().for_each(|_| println!("Thread 1")));
scope.spawn(|_| r.iter().for_each(|_| println!("Thread 2")));
iter.for_each(|(i, tree)| {
println!("i: {}", i);
s.send(tree).unwrap();
});
drop(s);
})
.unwrap();
}
The results are (evaluated with hyperfine
, compiled with cargo build --release
):
Benchmark #0: sequential
Time (mean Β± Ο): 5.194 s Β± 0.051 s [User: 4.744 s, System: 0.443 s]
Range (min β¦ max): 5.133 s β¦ 5.289 s 10 runs
Benchmark #1: par_bridge()
Time (mean Β± Ο): 6.628 s Β± 0.660 s [User: 9.968 s, System: 2.521 s]
Range (min β¦ max): 5.843 s β¦ 7.177 s 10 runs
Benchmark #2: collect().into_par_iter()
Time (mean Β± Ο): 6.006 s Β± 0.162 s [User: 6.916 s, System: 0.907 s]
Range (min β¦ max): 5.735 s β¦ 6.217 s 10 runs
Benchmark #3: chan_test()
Time (mean Β± Ο): 6.747 s Β± 0.134 s [User: 8.452 s, System: 0.433 s]
Range (min β¦ max): 6.468 s β¦ 6.953 s 10 runs
We can see that all parallel benchmarks (#1, #2, #3) are significantly slower than the sequential one (#0). Furthermore, the performance of benchmark #1 fluctuates very strongly.
Is there a way to concurrently treat my data faster, potentially without channels? Note that when processing an element of my sequence, I only need a reference to the element (here, &Tree
).