Can't figure out how to parallelize an iterator

I'm trying to parallelize an iterator from osmpbfreader crate, but can't understand what's missing.

The reader object reads blobs from a protobuf file and produces its own structures. It has its own par_iter, but if you map onything on it, it'll run in the main thread. Rayon's par_iter or par_bridge don't work on this iterator.

The examples from Rayon docs are very basic, usually working with vectors. Those, I could reproduce without isssues. More complex examples run threads manually, which is not quite suitable in my case (I need a parallelized reduce).

Some investigation:

The built-in par_iter does parallelization internally, but any .map-like methods on it are executed in the same main thread.

Test of threading:

let file = File::open(&Path::new("../osmtest/ilyicha-1.osm.pbf")).unwrap();
let mut reader = OsmPbfReader::new(file);
for _obj in reader.par_iter().map(|o| {
        println!("{:?} {:?}", thread::current().id(), o);
        Some(1)
    }) { 
    // nothing is done in the for loop
}

Output:

ThreadId(1) Ok(/* skipping the data here  */)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)
ThreadId(1) Ok(...)

When I do the same for simple Rayon demos, thread id's are different.

So I tried rewriting. As Rayon docs suggest, I just add .par_iter() to the call chain:

reader.par_iter().par_iter().map(|o| { println!("{:?} {:?}", thread::current().id(), o); Some(1) })

It gives compilation error:

the method `par_iter` exists for struct `ParIter<'_, File>`, but its trait bounds were not satisfied
method cannot be called on `ParIter<'_, File>` due to unsatisfied trait bounds
note: src/main.rs:46: the following trait bounds were not satisfied:
`&ParIter<'_, File>: IntoParallelIterator`
which is required by `ParIter<'_, File>: rayon::iter::IntoParallelRefIterator`

Reading the IntoParallelIterator doc, I can't figure out what I need to implement on the iterator.

When I tried reader.par_iter().par_bridge().map(...), the error was:

`rayon::iter::Map<IterBridge<ParIter<'_, File>>, [closure@src/main.rs:46:49: 46:114]>` is not an iterator
required because of the requirements on the impl of `IntoIterator` for `rayon::iter::Map<IterBridge<ParIter<'_, File>>, [closure@src/main.rs:46:49: 46:114]>`
required by `std::iter::IntoIterator::into_iter`

Another deeper documentation page discusses mostly issues with indexed iterators, which are not suitable for a stream from a file.

What am I missing?

1 Like

The par_iter from the reader parallelizes over blobs. So if your file only has one blob, it will only run one thread.

1 Like

I checked if it's single-threaded, but no. If I call reader.iter().map(..., it loads only 1 core, if I call reader.par_iter().map(..., it loads all of them. For some reason the method in map() afterwards seems to run in one thread.

1 Like

Apparently, here's where ParIter comes from:

pub_iterator_type! {
    #[doc="Parallel iterator on the `OsmObj` of the pbf file."]
    ParIter['a, R] = par_map::FlatMap<Blobs<'a, R>,
                                      blobs::OsmObjs,
                                      fn(Result<Blob>) -> blobs::OsmObjs>
    where R: io::Read + 'a
}

par_map is another crate from the same author. So ParIter is a type from there, not from Rayon.

1 Like

I took par_map that's used in osmpbfreader and made it work in parallel, but

  • there's no parallel reduce in that trait,
  • there are N+N threads, N for blob reader, N for par_map, 2x more than logical cores.

Any advice on how to implement Rayon's par_iter on an un-indexed iterator is welcome.


The test code:

use par_map::ParMap;
// ...
let result:Vec<i64> = reader.par_iter().par_map(|o| {
    println!("{:?}", thread::current().id());
o.unwrap().way().and_then(|w| Some(w.id.0))
}).filter_map(|i| i).collect();

I suspect you're trying to using it in a for loop, but unfortunately for loops only support normal iterators, and rayon's parallel iterators are not iterators (i.e. they don't implement IntoIterator, and even if they did that would not allow them to be parallel). Use instead methods like Parallelterator::for_each.

1 Like

You could give rayon::iter::ParallelBridge - Rust a try.
Not perfect but it should parallelize your map.

1 Like

Oh, now I see what I was missing. I saw this code example before, but it worked on a simple structure (Vec), rather than a non-indexable iterator, like mine. So I assumed that my errors (no method or trait not satisfied) were because of complex requirements. But it turned out I just missed the use statement. Looking at this example today, it was immediately clear.

use rayon::iter::ParallelBridge;
use rayon::prelude::ParallelIterator;
use std::sync::mpsc::channel;

let rx = {
    let (tx, rx) = channel();

    tx.send("one!");
    tx.send("two!");
    tx.send("three!");

    rx
};

let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
output.sort_unstable();

assert_eq!(&*output, &["one!", "three!", "two!"]);

I made a test code, and calling par_iter on my iterator worked, and reduce was working in different threads.

So, the only problem was missing the trait that was not imported.

1 Like

@culebron I have been running into exactly the same issue with osmpbfreader, and also cannot figure out how to use it with Rayon. I need to compare its performance with the osmpbf reader. In osmpbf I simply call BlobReader::from_path(pbf_file)?.par_bridge().for_each_with(...) to process each block.

Have you been able to figure out how to do it, and if so, can you post an example?

My goal is to re-implement PlaneTiler Java app in Rust, so I have been evaluating Rust' performance with the planetiler author using experiments.

In case this is still relevant: pariter - Rust

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.