Conditionally Switch Between Rayon Parallel and Sequential Iteration Based on Collection Size?

I'm using Rayon to process a vector like this:

myvec.par_iter().filter_map(/* ... */).collect();

This works well. However, I'd like to automatically fall back to sequential iteration (iter()) when myvec has fewer than 5 items. Ideally, I want something like:

myvec.conditional_iter(5).filter_map(/* ... */).collect();

Does Rayon provide built-in functionality for this? I implemented custom types/traits to handle filter_map/collect, but I’ll need to re-implement most Iterator methods. I’m wondering if I missed an existing solution.

Rayon can be slower for vectors with only a few elements. But for few elements, the operation should take only a few nanoseconds, so perhaps we should not really care?

That's exactly my point? I want to switch to iter() when the length is less than n.

They're saying it will only take a few ns with Rayon because the number of elements is so small, and therefore why try to optimize for this? Is it because you have a very large number iterations, each with a very small number number of elements?

Oh, I get it now. Yes, essentially n is less than 5 for most of the time but can be very large in rare instances. For these instances, I've found Rayon can give 70-80% time savings; so there is value on using it but only on these instances. For the rest, it is actually quite the overhead and will kill any savings.

But back to Stefan question, this is happening inside another loop of 10k+ operations. So these nano seconds add up.

1 Like

Consider using .par_chunks(5) instead. This should give you better performance in both sequential and parallel cases, if 5 items is little enough work that you want it sequential.

8 Likes

A better alternative to the par_chunks suggestion would be with_min_len, which tells Rayon not to split the iterator into parts smaller than n items. If an iterator is not split it will run on the current thread, just like you wanted.

13 Likes

I think with_min_less would be a better choice. I think par_chunks still run the tokio reactor per my profiling. Also I have learnt that neither of these functions return an Iterator if one is looking for compatibility this type! I ended up implementing this manually

use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

pub(crate) trait SmartIter<T: Send + Sync> {
    fn smart_iter(&self, n: usize) -> SmartIterator<T>;
}

impl<T: Send + Sync> SmartIter<T> for [T] {
    fn smart_iter(&self, n: usize) -> SmartIterator<T> {
        if self.len() <= n {
            SmartIterator::Sequential(self.iter())
        } else {
            SmartIterator::Parallel(self.par_iter())
        }
    }
}

pub(crate) enum SmartIterator<'a, T: Send + Sync> {
    Sequential(std::slice::Iter<'a, T>),
    Parallel(rayon::slice::Iter<'a, T>),
}

pub(crate) enum SmartFilterMap<'a, T: Send + Sync, F> {
    Parallel(rayon::iter::FilterMap<rayon::slice::Iter<'a, T>, F>),
    Sequential(std::iter::FilterMap<std::slice::Iter<'a, T>, F>),
}

impl<'a, T: Send + Sync> SmartIterator<'a, T> {
    pub(crate) fn filter_map<B: Send + Sync, F>(self, f: F) -> SmartFilterMap<'a, T, F>
    where
        F: Fn(&'a T) -> Option<B> + Send + Sync,
    {
        match self {
            SmartIterator::Parallel(iter) => SmartFilterMap::Parallel(iter.filter_map(f)),
            SmartIterator::Sequential(iter) => SmartFilterMap::Sequential(iter.filter_map(f)),
        }
    }
}

impl<'a, T: Send + Sync, B: Send + Sync, F> SmartFilterMap<'a, T, F>
where
    F: Fn(&'a T) -> Option<B> + Send + Sync,
{
    pub(crate) fn collect(self) -> Vec<B> {
        match self {
            SmartFilterMap::Parallel(iter) => iter.collect(),
            SmartFilterMap::Sequential(iter) => iter.collect(),
        }
    }
}

Rayon does not run the tokio reactor...

1 Like

This post contains significant errors. See below.

It is not, in fact, better. Using chunks and an interior sequential (normal Iterator) iteration allows the compiler to unroll, SIMDify, and otherwise optimize that sequential loop. with_min_len can't, or at least doesn’t in practice, provide that advantage (presumably because each iteration is still using the code that is compiled such that it could split at any time). To prove this, I wrote a benchmark program:

Benchmark
use criterion::{Criterion, criterion_main};
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator as _};
use rayon::slice::ParallelSlice;

criterion_main!(bench_main);
fn bench_main() {
    let mut c = Criterion::default().configure_from_args();
    benches(&mut c, "arithmetic", |x| x + 1);
    benches(&mut c, "sin", |x| ((x as f64).sin() * 100.0) as u64);
    benches(&mut c, "string", |x| x.to_string().parse().unwrap());
}

fn benches(
    c: &mut Criterion,
    label: &str,
    per_element_operation: impl Fn(u64) -> u64 + Send + Sync + Copy,
) {
    let mut g = c.benchmark_group(label);

    let data: Vec<u64> = (0..100_000).collect();
    let chunk_size = 4096;

    g.bench_function("naive", |b| {
        b.iter(|| {
            data.par_iter()
                .copied()
                .map(per_element_operation)
                .sum::<u64>()
        });
    });
    g.bench_function("with_max_len", |b| {
        b.iter(|| {
            data.par_iter()
                .copied()
                .with_max_len(chunk_size)
                .map(per_element_operation)
                .sum::<u64>()
        });
    });
    g.bench_function("par_chunks", |b| {
        b.iter(|| {
            data.par_chunks(chunk_size)
                .map(|slice| {
                    slice
                        .iter()
                        .copied()
                        .map(per_element_operation)
                        .sum::<u64>()
                })
                .sum::<u64>()
        });
    });
}

These are my results (Apple M4 Max CPU):

arithmetic/naive        time:   [124.53 µs 125.00 µs 125.48 µs]
arithmetic/with_max_len time:   [123.30 µs 124.29 µs 125.24 µs]
arithmetic/par_chunks   time:   [44.726 µs 45.004 µs 45.258 µs]

sin/naive               time:   [166.90 µs 167.69 µs 168.63 µs]
sin/with_max_len        time:   [166.87 µs 167.49 µs 168.15 µs]
sin/par_chunks          time:   [78.157 µs 78.750 µs 79.257 µs]

string/naive            time:   [358.32 µs 360.07 µs 362.47 µs]
string/with_max_len     time:   [359.89 µs 361.51 µs 363.66 µs]
string/par_chunks       time:   [322.54 µs 323.41 µs 324.34 µs]

par_chunks() always wins, by a little or a lot. Rayon’s parallelism is easy but it’s not zero-cost and you can get much better results when you arrange your program so it can take advantage of parallelism and sequential loop optimizations — take advantage of multiple CPU cores and the best throughput available from each core.

(Then why does with_min_len() exist? I don't know, but the place where I expect it might provide an advantage is if you are using an operation like parallel fold() where you have some initialization work to do in each split-off job; in this case, with_min_len() might avoid those costs of excessive subdivision, if not the intrinsic execution overhead.)

8 Likes

I was doing profiling and it "pollutes" the output with threads, so I just assumed it was Tokio. :face_without_mouth:

Thanks for the detailed response. I think we are measuring two different things here. My issue is that when the Iterator size is too small, I'd like to skip rayon. par_chunks does have a cost but I re-measured again and it is the closest to no rayon. Still my workload (inside the iteration) is not evenly distributed like the benchmark sample and it is not possible to predict the compute time until you run it.

Your benchmark is using with_max_len though, not with_min_len, so you are preventing rayon from going to the sequential iteration phase when you have more than chunk_size elements left rather than forcing it to do so when it has less than or exactly chunk_size elements left.

I would also suggest you to try different lengths for data to see how adaptive each solution is.

No, rayon works by first splitting the iterator a number of times (to be able to feed all the threads, while respecting the limits set by with_min_len and with_max_len) and then it runs the individual chunks it got through a sequential iterator (from the Iterator trait!).

Oops. I somehow got the impression that there wasn’t a with_max_len function, only, so when I was assembling the benchmark code, I neglected to look further than with_m*.

Here are the corrected results, plus varying of the chunk size. Note that the numbers generally get worse with larger chunks when we no longer have enough parallel items for all cores (100,000 / 16384 ≈ 6.1) — but interestingly, not on the arithmetic test!

arithmetic/naive               time:   [112.61 µs 113.85 µs 115.15 µs]
arithmetic/with_min_len(4)     time:   [98.758 µs 99.913 µs 100.94 µs]
arithmetic/with_min_len(128)   time:   [63.479 µs 63.982 µs 64.447 µs]
arithmetic/with_min_len(4096)  time:   [41.051 µs 41.690 µs 42.644 µs]
arithmetic/with_min_len(16384) time:   [37.018 µs 37.398 µs 37.703 µs]
arithmetic/par_chunks(4)       time:   [115.70 µs 116.83 µs 117.96 µs]
arithmetic/par_chunks(128)     time:   [69.250 µs 69.957 µs 70.636 µs]
arithmetic/par_chunks(4096)    time:   [44.089 µs 44.448 µs 44.778 µs]
arithmetic/par_chunks(16384)   time:   [33.127 µs 33.254 µs 33.375 µs]    # best

sin/naive                      time:   [164.52 µs 165.44 µs 166.43 µs]
sin/with_min_len(4)            time:   [137.77 µs 138.58 µs 139.28 µs]
sin/with_min_len(128)          time:   [98.357 µs 98.694 µs 99.061 µs]
sin/with_min_len(4096)         time:   [93.748 µs 94.326 µs 94.877 µs]
sin/with_min_len(16384)        time:   [124.95 µs 125.22 µs 125.47 µs]
sin/par_chunks(4)              time:   [152.82 µs 153.38 µs 153.99 µs]
sin/par_chunks(128)            time:   [106.90 µs 107.30 µs 107.71 µs]
sin/par_chunks(4096)           time:   [78.868 µs 79.498 µs 80.022 µs]    # best
sin/par_chunks(16384)          time:   [98.518 µs 98.844 µs 99.180 µs]

string/naive                   time:   [353.79 µs 355.52 µs 357.60 µs]
string/with_min_len(4)         time:   [327.12 µs 328.16 µs 329.27 µs]
string/with_min_len(128)       time:   [287.64 µs 289.21 µs 291.51 µs]
string/with_min_len(4096)      time:   [435.65 µs 436.60 µs 437.59 µs]
string/with_min_len(16384)     time:   [677.53 µs 679.26 µs 681.13 µs]
string/par_chunks(4)           time:   [344.15 µs 345.12 µs 346.10 µs]
string/par_chunks(128)         time:   [285.85 µs 286.56 µs 287.34 µs]    # best, barely
string/par_chunks(4096)        time:   [291.20 µs 293.47 µs 296.16 µs]
string/par_chunks(16384)       time:   [485.19 µs 485.86 µs 486.63 µs]

par_chunks is still the best given an ideal chunk size, but this clearly demonstrates I was wrong about the idea that with_min_len() can't achieve similar results.

Code
use criterion::{Criterion, criterion_main};
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator as _};
use rayon::slice::ParallelSlice;

criterion_main!(bench_main);
fn bench_main() {
    let mut c = Criterion::default().configure_from_args();
    benches(&mut c, "arithmetic", |x| x + 1);
    benches(&mut c, "sin", |x| ((x as f64).sin() * 100.0) as u64);
    benches(&mut c, "string", |x| x.to_string().parse().unwrap());
}

fn benches(
    c: &mut Criterion,
    label: &str,
    per_element_operation: impl Fn(u64) -> u64 + Send + Sync + Copy,
) {
    let mut g = c.benchmark_group(label);

    let data: Vec<u64> = (0..100_000).collect();
    let chunk_sizes = [4, 128, 4096, 16384];

    g.bench_function("naive", |b| {
        b.iter(|| {
            data.par_iter()
                .copied()
                .map(per_element_operation)
                .sum::<u64>()
        });
    });
    for chunk_size in chunk_sizes {
        g.bench_function(format!("with_min_len({chunk_size})"), |b| {
            b.iter(|| {
                data.par_iter()
                    .copied()
                    .with_min_len(chunk_size)
                    .map(per_element_operation)
                    .sum::<u64>()
            });
        });
    }
    for chunk_size in chunk_sizes {
        g.bench_function(format!("par_chunks({chunk_size})"), |b| {
            b.iter(|| {
                data.par_chunks(chunk_size)
                    .map(|slice| {
                        slice
                            .iter()
                            .copied()
                            .map(per_element_operation)
                            .sum::<u64>()
                    })
                    .sum::<u64>()
            });
        });
    }
}

I would also suggest you to try different lengths for data to see how adaptive each solution is.

I agree this would be useful to distinguish effects from the chunk size’s relationship to the workload vs. the chunk size’s relationship to the data. I haven’t gotten to investigating it because it’d be a lot more data and I want to acknowledge my previous error promptly.

I am surprised by this because such an implementation strategy will be inefficient given items of greatly varying individual cost. I suppose that it works well enough because such items are uncommon and are likely to have their own internal parallelism (such as a nested iterator). If I’m going to say things about rayon, I suppose I should read more of its code…

2 Likes

The issue is that you need to balance the overhead of concurrency with the loss of parallelism due to some items having greater cost than others. There's no definitive answer because a generic library does not have enough knowledge to pick the best one in every situation. This is also why with_min_len and with_max_len exist, to allow users to inform the library of choices it should make in particular cases. For example if you expect some items to be much slower to process than others you might want to use .with_max_len(1) to limit the batching, while if you know that you'll have a varying number of items but they will all be very fast to process then you'll want to use .with_min_len(N) to always batch at least N items together.

2 Likes