Rayon: accept anything that can be converted into IndexedParallelIterator

TLDR

How to write a function which can accept anything that can be converted into rayon::iter::IndexedParallelIterator?

Details

I have a prototype function that looks roughly like this:

fn doit(data: &[XXX], job_size: usize) -> T {
    data
        .par_iter()
        .with_min_len(job_size)
        .with_max_len(job_size)
        ...
}

The with_{max,min}_len are needed because creating and combining the
accumulators is rather expensive, so we need to inhibit rayon's tendency to
create many small sub-jobs.

Passing in &[XXX] is OK for prototyping purposes, but in real life we have
cases where the data don't fit into memory. So we want to replace the &[XXX]
parameter with an iterator, which I can get to work along these lines

fn doit<F, A, R, T, D>(
    data: D,
    fold_op: F,
    redc_op: R,
    job_size: usize,
) -> A
where
    F: Fn(A, &T) -> A + Send + Sync,
    A: Send + Sync + Default,
    R: Send + Sync + Fn(A, A) -> A,
    for <'a> &'a D: IntoParallelIterator<Item = &'a T>,
{
    data
        .par_iter()
        //.with_min_len(job_size)
        .fold  (A::default, fold_op)
        .reduce(A::default, redc_op)
}

at the cost of losing with_{max,min}_len_: these require
rayon::iter::IndexedParallelIterator.

In many cases we do know exactly how many data there will be, so we can
implement std::iter::ExactSizedIterator to generate them, but I don't see a
link between this and IndexedParallelIterator, and there doesn't seem to be an
IntoIndexedParallelIterator.

How about

for<'a> &'a D: IntoParallelIterator<Item = &'a T>,
<<&'a D> as IntoParallelIterator>::Iter: IndexedParallelIterator

?

Your suggestion needed a bit of massaging:

       <<&'a D> as IntoParallelIterator>::Iter: IndexedParallelIterator // original
for<'a> <&'a D  as IntoParallelIterator>::Iter: IndexedParallelIterator // changed

but then it seems to work:

fn doit<F, A, R, T, D>(
    data: D,
    fold_op: F,
    redc_op: R,
    job_size: usize,
) -> A
where
    F: Fn(A, &T) -> A + Send + Sync,
    A: Send + Sync + Default,
    R: Send + Sync + Fn(A, A) -> A,
    for<'a>  &'a D: IntoParallelIterator<Item = &'a T>,
    for<'a> <&'a D as IntoParallelIterator>::Iter: IndexedParallelIterator
{
    data
        .par_iter()
        .with_min_len(job_size)
        .fold  (A::default, fold_op)
        .reduce(A::default, redc_op)
}

Something is still not quite right.

Parameters of type impl IntoIterator<Item = T> make it possible to accept

  1. Anything that can be turned into the required kind of iterator
  2. ... which trivially includes something that already is the required kind of iterator.
fn foo<T>(_: impl IntoIterator<Item = T>) { todo!() }

fn bar() {
    let not_iterator: Vec<u32> = (0..100).collect();
    let yes_iterator = not_iterator.clone().into_iter();
    foo(not_iterator);
    foo(yes_iterator);
}

But I'm failing to reproduce this kind of generality in the case of the IndexedParallelIterator.

Can it be done?

But that's not what you're doing. You have a bound for something more like Iterable that produces a shared iterator without taking ownership, which we approximate with for<'any> &'any IntoIterator<...>. And while an iterator produced via IntoIterator is trivially an IntoIterator to itself, it's not necessarily true that an iterator produced via an iterable is itself iterable.

This doesn't work with non-Rayon iterators either.

If you write out what it might look like, it's basically Clone:

impl<I: Iterator> IntoIterator for &I {
    type IntoIter = I;
    type Item = I::Item;
    // vvv same signature as Clone::clone, given Self = &I
    fn into_iter(self) -> Self::IntoIter {
        // ???
    }
}

And we can't have that due to coherence issues.

1 Like

For this, you should not use par_iter(), because that is always a conversion from a reference. Instead just use into_par_iter(), and let the caller choose to pass in &iterable or existing_iterator.

fn doit<'a, F, A, R, T: 'a, D>(
    data: D,
    fold_op: F,
    redc_op: R,
    job_size: usize,
) -> A
where
    F: Fn(A, &T) -> A + Send + Sync,
    A: Send + Sync + Default,
    R: Send + Sync + Fn(A, A) -> A,
    D: IntoParallelIterator<Item = &'a T>,
    D::Iter: IndexedParallelIterator,
{
    data
        .into_par_iter()
        .with_min_len(job_size)
        .fold  (A::default, fold_op)
        .reduce(A::default, redc_op)
}
3 Likes

BTW, for this pattern where you're immediately following it with a fold, you might like fold_chunks that was added in rayon 1.6.0.

1 Like

This sorts out some confusions and blinds spots I had. Thanks.

However, I still don't see how to link the IndexedParallelIterator bound to non-rayon iterators which know how many elements they will produce.

As a toy example consider a Below(N) iterable, which generates the same values as 0..N:

fn main() {
    let iter = Below(101);
    let vec = Vec::from_iter(iter);
    //dbg!(doit(iter, fadd, radd, 10)); // Can this be made to work?
    dbg!(doit(&vec, fadd, radd, 10));

    fn fadd(a: u32, b: &u32) -> u32 { a + b }
    fn radd(a: u32, b:  u32) -> u32 { a + b }
}

Is there a way to use something like Below(...) in doit

  • without having to gather all the data into memory at once (as is done in vec above)
  • while keeping fold_chunks in the implementation of doit
    ?

playground

The non-rayon Iterator implementations are nearly irrelevant. Apart from par_bridge() (which doesn't translate indexed/exact-size properties), rayon::iter traits are independent. You could impl IntoParallelIterator for Below though, and the easiest way to do that is to just wrap the related Range implementation.

impl IntoParallelIterator for Below {
    type Item = u32;
    type Iter = rayon::range::Iter<u32>;
    fn into_par_iter(self) -> Self::Iter {
        (0..self.0).into_par_iter()
    }
}

If you want to hide that detail with a BelowParIter newtype, you can, but then you'll also have to implement ParallelIterator and IndexedParallelIterator too.

This still doesn't work with doit as written, because that wants Item = &T, and range-like iterators don't have backing memory to hand out references like that. Maybe for that, you can be even more generic and take whatever D::Item provides:

fn doit<F, A, R, D>(
    data: D,
    fold_op: F,
    redc_op: R,
    job_size: usize,
) -> A
where
    F: Fn(A, D::Item) -> A + Send + Sync,
    A: Send + Sync + Default,
    R: Send + Sync + Fn(A, A) -> A,
    D: IntoParallelIterator,
    D::Iter: IndexedParallelIterator,
{
    data
        .into_par_iter()
        .fold_chunks(job_size, A::default, fold_op)
        .reduce     (          A::default, redc_op)
}

Then the call needs to be adjusted to use radd in both places:

    dbg!(doit(iter, radd, radd, 10));
    dbg!(doit(&vec, fadd, radd, 10));
1 Like

Ah yes, that's much better. I got too bogged down in details to spot this.

My bad: the whole point of Below being a toy example is to avoid spamming the discussion with the complex details of my real use cases, which, of course are NOT mere wrappers around things for which IntoParallelIterator is already implemented. I apologize for not having made this sufficiently clear.

It's not a question of wanting to hide that detail: that detail is not there at all in the real cases.

Perhaps it is worthwhile to spam the discussion with some domain-specific details: my real iterators are constructed with specifications of volumes in 3D Euclidean space and they generate regularly-distributed points in those spaces, and lines joining all possible pairs of those points, with filters applied to the latter.

Which hints at a problem: Why does fold_chunks (and with_{min,max}_len_) require indexed iterators? Or rather, is there a way of relaxing this need? [But see below: I suspect I've misunderstood what 'indexed' means, so this question is probably misguided.]

In my real-world case, I could implement logic that maps an index to a corresponding point or line, but I'd rather not as it complicates the implementation at least a little. More importantly, once the filter is applied to the generated lines, I lose the ability to maintain a mapping between index and element.

  • I don't care about the order in which I get these points and lines, but

  • I do care about avoiding many small sub-jobs (because of the huge cost of creating and combining accumulators)

Is there some way of telling rayon to avoid small jobs without the iterator being indexed?

So, this is the way to go, rather than ParallelBridge or IntoParallelIterator?

The required methods seem to be:

  • ParallelIterator
    • drive_unindexed
  • IndexedParallelIterator
    • len
    • drive
    • with_producer

which leads me to suspect that I've misunderstood the meaning of 'indexed': I thought that 'indexed' meant

has the ability to produce the Nth item in the sequence, on demand

but does it actually mean

knows how many elements will be produced in total

?

If so, then my earlier question becomes: What will go wrong if an IndexedParallelIterator only knows the approximate number of items?

IndexedParallelIterator is documented as:

An iterator that supports “random access” to its data, meaning that you can split it at arbitrary indices and draw data from those points.

For something like enumerate, that lets us split the job at some index i, so we can enumerate 0..i on one thread and i..len on another, and those will be split further as needed. For zip, it lets us split two different iterators at the same index i and keep a 1:1 correspondence. Or collect_into_vec can split its destination into separate precise slices for the items that will be written.

Notice that it's not just about counting. For example, a HashSet knows exactly how many items it contains, but if its iterator has a reference to the raw table, and I want to split at i = len / 2, there's no easy way for it to find that since the items are hash-randomly placed.

Then those methods will misbehave. There won't be any memory unsafety or other UB, but it's otherwise unspecified -- e.g. they could produce incorrect results or panic. For fold_chunks, it probably just means that the chunks will be too big or small in practice, but I'm not making any promises about that. :slight_smile:

We don't really have a fuzzier notion like Iterator::size_hint. A long time ago we had BoundedParallelIterator, but we didn't have a good use for that, so we removed it. See rayon#229 for details.

For a more approximate number of items, where you still want some control of the lower bound, you could implement your own ParallelIterator -- as you get into the "plumbing" you'll see that UnindexedProducer::split can return None if it can't split further (or doesn't want to). There is also a function rayon::iter::split to implement this idea at a higher level.

1 Like

Hmm, it increasingly feels like my use case is struggling against what fits rayon naturally. Even if I implement IndexedParallelIterator, sticking a filter downstream is going to wipe out all the random access knowledge.

Recall that I have use-cases where creating and combining the accumulators is very expensive, so I need to inhibit rayon's tendency to create small sub-jobs; this is why I'm interested in fold_chunks whence my need for indexed. But by chopping the whole job up into essentially fixed-sized jobs, I'm forgoing rayon's work-stealing goodies. Consequently, in cases like this, might it be better not to use rayon at all?

What if I were to spawn N threads, and round-robin (or some other simple strategy) incoming items between them? What other rayon benefits would I be losing?

First, it's totally fine if Rayon doesn't fit your needs! If it's causing you more friction than benefit, and you don't need interop with other code using rayon, then rolling your own may be best.

When you don't care about order at all, a sequential iterator fed into .par_bridge() might work for you. The current bridge implementation is pretty simplistic, just a Mutex around the iterator, and each thread takes the lock to pull an item on demand. As long as you have enough real work after that to keep busy, then the lock contention shouldn't be too bad. You can use a simple fold in this case, and that will be operating per-thread without any further subdivision.

You can also use the Rayon pool without bothering with the iterator APIs, making your own calls to join, scope, broadcast, etc. You can even do that directly from rayon-core to trim your build.

Is the following correct?

In order to implement IndexedParallelIterator, it is necessary to implement

  • std::iter::ExactSizeIterator
  • std::iter::DoubleEndedIterator
  • rayon::iter::plumbing::Producer (code name for splittable-iterator)
  • rayon::iter::ParallelIterator

Additionally, implementing the required methods of the parallel iterators will require use of

  • rayon::iter::plumbing::Consumer
  • rayon::iter::plumbing::UnindexedConsumer
  • rayon::iter::plumbing::bridge
  • rayon::iter::plumbing::ProducerCallback

[Aside: if the README, in its discussion of Producer had included the words

A Producer is effectively a “splittable IntoIterator”.

(which are the first words in the docs of Producer itself), this would have saved me a lot of time!

Also, in the same README, after a lot of head scratching, I conclude that in the sentence

Unlike producers, there are two variants of consumers.

the first word should be replaced with "Like" or "Just like". Is this correct, or have I missed the point of the sentence? (If the former, then it's another annoying speed bump in the README.)

]

It looks like I might be able to get decent mileage out IndexedParallelIterators that are Cartesian products of ranges (or other IndexedParallelIterators iterators in general).

Before I try to implement these the long and hard way, I'd like to check whether

  • something like it already exists
  • there are some shortcuts that might avoid some of the work WRT a naive implementation
  • I'm failing to spot some fundamental problem with the whole idea

The direct type implementing IndexedParallelIterator only needs to implement ParallelIterator as well. The rest may be needed internally on ancillary types, depending on how you actually approach the implementation.

If possible, the easiest approach is to delegate your fn with_producer to some combination of existing indexed iterators. A trivial example is Once, which just delegates to Some(self.iter).into_par_iter(). Another is Either, which matches the Left or Right variant. There's a more complicated example in range::Iter<char> here, which uses a macro to encapsulate a map and possibly a chain (to get around the surrogate gap) -- but all that lets it avoid a bespoke Producer etc.

Ah, sorry, that should have been updated way back in rayon#170. There used to be no UnindexedProducer, so the consumers were "unlike" that by having two modes, but yeah that should be "like" now.

You can write an unindexed Cartesian product like:

(0..x).into_par_iter().flat_map(|ix| {
    (0..y).into_par_iter().map(move |iy| (ix, iy))
})

... but flat_map prevents that from being indexed, just like a normal Iterator::flat_map is not exact-sized.

It might be overkill to try to make it all indexed though -- if you're just looking for the parallelism control of with_min_len or fold_chunks, then you may be happy enough without inner parallelism:

(0..x).into_par_iter().flat_map_iter(|ix| {
    (0..y).map(move |iy| (ix, iy)) // a regular `Iterator`
})

That will divide 0..x in parallel, but then process (ix, 0..y) sequentially. You can also apply with_min_len before the flat_map_iter to further control the 0..x parallelism.

Or maybe you don't even want to flat-map that at all:

(0..x).into_par_iter().for_each(|ix| {
    for iy in 0..y {
        process(ix, iy);
    }
});
1 Like

[Thank you for your prompt, patient and informative replies, they are very helpful.]

I have a two-step process:

  1. generate/obtain data,
  2. process them.

Originally step 1 only ever involved reading a large but tractable dataset into memory, so step 2 could be fed a &[Datum] and use .par_iter() on it.

Now I need to address a second variety of step 1 which

  • is embarrassingly parallel so into_par_iter and friends work like a charm,
  • produces more data than fit in memory.

If I use par_iter in step 1, I don't know how to feed the resulting parallel iterator into step 2. If the generated data fit in memory, it would be a simple matter of collecting, in order to turn it back into a sequential iterable, but they don't fit and the only solution I've found so far is to not parallelize step 1, forgoing the huge performance benefits that provides.

Are there any strategies for feeding parallel iterators into parallel iterators without collecting the data in between?

Edit ... notes:

  • The order in which step 1 generates the data and step 2 consumes them is irrelevant.
  • Step 2 uses accumulators which are expensive to create and expensive to combine, which is why with_min_len/fold_chunks were important upthread.

What's feeding that data production? Can you parallelize that at a higher level? e.g. splitting files, offset ranges, etc.

  1. Divide a 3D volume up into (lots of) elements.
  2. Find all pairs of elements, avoiding double-counting.
  3. Filter interesting pairs.
  4. Create one datum from each pair of points.

Roughly/conceptually something like this:

let points: Vec<Point3D> = calculate_element_positions(...);
let points = &points // avoid moving points into closure
(0..points.len())
    .into_par_iter()
    .flat_map_iter(|i| (i..points.len())
                   .filter(move |&j|     is_interesting(points[i], points[j]))
                   .map   (move | j| make_32_byte_datum(points[i], points[j]))

... in order to split step1 into chunks of data that fit into memory? Then feed those separately into step 2, and combine the results of the separate step 2s.

I don't see any obvious problem with that. There might be some complications relating to reuse of step 2 in other contexts. Let's give it a go ...

One problem is that the proportion of data filtered will vary strongly from region to region, so splitting the data into chunks before the filter is likely to lead to chunks with very unbalanced sizes coming out after the filter.