use rayon::iter::Either;
use rayon::prelude::*;
struct Object {
// data
}
fn parse_line_with_comma(_line: &[u8]) -> Vec<Object> {
// This can be length 1 to 9
vec![Object {}, Object {}]
}
fn parse_line_without_comma(_line: &[u8]) -> Object {
Object {}
}
fn parse_line(line: &[u8]) -> impl ParallelIterator<Item = Object> {
let n = std::cmp::min(10, line.len());
if line[..n].contains(&b',') {
let lines: Vec<Object> = parse_line_with_comma(line);
Either::Left(lines.into_par_iter())
} else {
let line: Object = parse_line_without_comma(line);
Either::Right(rayon::iter::once(line))
}
}
fn main() {
// This is read from a file and can be > 100MB
let data = vec![0u8; 1000];
// This can run in parallel
let objects: Vec<_> = data
.par_split(|&c| c == b'\n')
// ??? This does not perform well
.flat_map(parse_line)
.collect();
// let new_objects = serial_process(objects.into_iter()).collect()
// let result = new_objects.into_par_iter().fold(...).reduce(...)
}
The basic idea is that each line that's parsed can generate either 1 or many Object's (where many is at most 9). I'm trying to optimize for the more common case of one line result. If I run my code like this on a file that only returns lines that parse to one object, this runs about 50% slower than if I do a map and have my code only take the first object of parse_line_with_comma (which would be incorrect but was done for the sake of comparison). I suspect its the overhead of the nested parallel iterator which I don't really need.
I also tried creating an enum OneOrMany with a Vec for many and implemented an iterator that flattens it. This performs fine for the case where its mostly One results but I suspect it won't be great if there are a lot of Many results due to pointer chasing. And if i store the values inline in OneOrMany, performance isn't good, presumably due to the enum object being signficantly larger (and mostly empty).
I just haven't figured a clean way to solve the problem. Any help would be appreciated.
Rayon's flat_map is not great for short sequences, because each iterable will essentially go through another round of task division. This is especially wasteful when you're just turning around to collect it.
For the example you've given, I think I'd just avoid the parallel flat_map altogether, and instead collect a Vec<Either<..>>, which you can lazily flat_map when feeding to the serial_process part.
If you dig even deeper, you'll see that variable-length collect::<Vec<_>> actually does a fold+reduce into a LinkedList<Vec<_>> and then flattens that serially into the final Vec<_>. You could skip that middleman too, something like:
let objects = data
.par_split(|&c| c == b'\n')
.fold(Vec::new, |mut vec, line| {
if ... {
vec.extend(parse_line_with_comma(line));
} else {
vec.push(parse_line_without_comma(line));
}
vec
})
.map(|vec| {
let mut list = LinkedList::new();
list.push_back(vec);
list
})
.reduce(LinkedList::new, |mut list1, mut list2| {
// one of the rare reasons to use a list is O(1) append
list1.append(&mut list2);
list1
});
let new_objects = serial_process(objects.into_iter().flatten()).collect();
Thanks @cuviper. That worked like a charm. I hadn't thought to use fold partially since in my head it wouldn't necessarily keep the order even though I know I read the documentation that told me otherwise. Its also probably the first time I've seen a LinkedList in a recommended solution
One more question if you don't mind. In the final commented line of code, i take what is an Iterator and feed it back into rayon to process in parallel again. I may be able to make what I call the serial process more parallel, though that requires a lot more thought since the processing very much depends on the current state. But are there any other alternatives than .collect().into_par_iter() and .par_bridge().into_par_iter() to transition from an Iterator back to a ParallelIterator. In testing, collect().into_par_iter() performs better, so I'm planning on just leaving that in, but I was curious if there was another approach.
The advantage of .par_bridge() (noting that further .into_par_iter() is redundant) is that it can stream results, feeding a little bit into the thread pool while you're still working on more serially. It does come with its own overhead though, and the bridge also does not preserve order at all.
Otherwise, it's fine to collect first. It just becomes a synchronization point of sorts, where nothing else can proceed until that's done.