Fallible conversion with rayon

I'm trying to use par_iter to generate a sequence of values and feel like I'm repeatedly trying to put a square peg into a round hole.

I have a block of code that should run once per element that converts T into U and that is fallible (uses ? all over the place). Since it's fallible and I want all threads to immediately stop if there is an error, my best bet looks like ParallelIterator::try_for_each. However, for some reason try_for_each locks the Output type to Result<(), Error>, so I can't use it for T to U conversion. So then I saw ParallelIterator::while_some, which lets you make iteration stop in response to None, so I could return Some(u) or None, but then I don't propagate the specific error that occurred. Finally I could use try_for_each but capture a mutable ref to some container that I push back onto, but that doesn't work either without wrapping the container in a Mutex because all the threads will be separately pushing onto it. Shouldn't there just be a try_map?

There's gotta be a more natural way to do this? Rayon Parallel Iterator docs

There's filter_map and flat_map, but better than that, there's collect. Similarly to the sequential collect it can collect into a Result or a Vec of Result or many other things.

1 Like

I don't see how collect relates to my question? Collect doesn't pay attention to fallibility or fail fast on error.

It does deal with failibility. Both the standard library sequential collect and rayon's parallel collect will short circuit on the first Err variant if you collect into, for example, a Result<Vec<_>, _>.

You'll get more details if you read the documentation (linked above).

collect is highly polymorphic, which causes some discoverability issues, but it's a really good multitool once you know more about the traits it uses.

1 Like

Hmm, all I see is this sentence:

All of that can also be combined with short-circuiting collection of Result or Option types

But it's not clear to me that this is the fail fast behavior I'm looking for. Say the input sequence is 1000 elements long, and the work is split across 2 threads that each process 500 elements. If the first thread returns Err when processing the first element in its portion, does "short circuiting" mean that thread 2 will process all 500 of its elements, and then collect is smart enough to combine thread 1's and thread 2's results into a single error, or does it mean that it's smart enough to tell thread 2 to stop before it finishes processing all 500 elements? Only try_for_each and try_reduce seem to have this stronger language:

we will attempt to stop processing the rest of the items in the iterator as soon as possible

I find this a little confusing to figure out from the docs because there's no mention of Try or Result in the signature for collect. It just has FromParallelIterator, which also doesn't mention either, which only mentions IntoParallelIterator, which has an impl for &Result<T, E> but looking at the source for that I don't see something that obviously looks like it would send some sort of signal to a thread pool.

FromParallelIterator has an impl for Result, with the following docs (you might need to expand):

Collect an arbitrary Result-wrapped collection.

If any item is Err, then all previous Ok items collected are discarded, and it returns that error. If there are multiple errors, the one returned is not deterministic.

So it's not an explicit guarantee that it will stop other threads, but you can infer that it's going to return whichever it gets first (for a given value of first), and at that point it's an assumption of quality that it cancels any work on progress.

The current implementation delegates to while_some, which has:

Creates an iterator over the Some items of this iterator, halting as soon as any None is found.

which sounds about as good as you're going to get in a parallel implementation.

4 Likes

Oof reading traits docs is tricky, thanks for the pointer. I later realized I can always put a while_some that checks an AtomicBool in my combinator chain before or after the map that does the actual work to implement my own early exit if I need to. Even better if that's basically what it does already.