I'm trying to parallelize an iterator from osmpbfreader
crate, but can't understand what's missing.
The reader object reads blobs from a protobuf file and produces its own structures. It has its own par_iter
, but if you map
onything on it, it'll run in the main thread. Rayon's par_iter
or par_bridge
don't work on this iterator.
The examples from Rayon docs are very basic, usually working with vectors. Those, I could reproduce without isssues. More complex examples run threads manually, which is not quite suitable in my case (I need a parallelized reduce
).
Some investigation:
The built-in par_iter
does parallelization internally, but any .map
-like methods on it are executed in the same main thread.
Test of threading:
let file = File::open(&Path::new("../osmtest/ilyicha-1.osm.pbf")).unwrap();
let mut reader = OsmPbfReader::new(file);
for _obj in reader.par_iter().map(|o| {
println!("{:?} {:?}", thread::current().id(), o);
Some(1)
}) {
// nothing is done in the for loop
}
Output:
ThreadId(1) Ok(/* skipping the data here */)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
When I do the same for simple Rayon demos, thread id's are different.
So I tried rewriting. As Rayon docs suggest, I just add .par_iter()
to the call chain:
reader.par_iter().par_iter().map(|o| { println!("{:?} {:?}", thread::current().id(), o); Some(1) })
It gives compilation error:
the method `par_iter` exists for struct `ParIter<'_, File>`, but its trait bounds were not satisfied
method cannot be called on `ParIter<'_, File>` due to unsatisfied trait bounds
note: src/main.rs:46: the following trait bounds were not satisfied:
`&ParIter<'_, File>: IntoParallelIterator`
which is required by `ParIter<'_, File>: rayon::iter::IntoParallelRefIterator`
Reading the IntoParallelIterator doc, I can't figure out what I need to implement on the iterator.
When I tried reader.par_iter().par_bridge().map(...)
, the error was:
`rayon::iter::Map<IterBridge<ParIter<'_, File>>, [closure@src/main.rs:46:49: 46:114]>` is not an iterator
required because of the requirements on the impl of `IntoIterator` for `rayon::iter::Map<IterBridge<ParIter<'_, File>>, [closure@src/main.rs:46:49: 46:114]>`
required by `std::iter::IntoIterator::into_iter`
Another deeper documentation page discusses mostly issues with indexed iterators, which are not suitable for a stream from a file.
What am I missing?