Parallel iterator execution without job stealing (no rayon)

We would like to execute a function on items emitted by an iterator in parallel, in the order in which they are emitted by the iterator (so no job stealing, no rayon); the functions have some results that must be folded.

This can be easily done by having a thread sending the jobs to a Crossbeam channel and having a thread of pools extracting the jobs from the channel and executing them; the functions then send the results to another channel, where they are gathered by another thread. But we were wondering if there's some crate that already does that.

I know you say no rayon, but the crate that does that is rayon. You can use rayon::spawn if you don't want to rely on rayon's parallel iterator feature.

What do you mean? Executing in parallel will inherently run in an order that's not guaranteed to be the one emitted by the iterator. Or do you just mean you want the final fold to be in that order?

rayon::spawn takes a function that has side-effect, and no result. We need to fold the results.

We can manage internally to the function the act of sending the results to a channel, but this is exactly what we wanted to avoid.

Sorry, you're right. I mean that the jobs are submitted for execution in the order in which they are emitted. So threads pick the next job in the order in which they are emitted by the iterator.

There will be some slack in the execution order, but we want to avoid distribution of the items in work queues. Jobs close in the iterator order have a good locality, so we want to avoid that they are executed in a completely unpredictable order.

This will do what you're asking for:

use rayon::iter::{ParallelBridge, ParallelIterator};

fn run_in_parallel<I, F, T>(ops: I) -> Vec<T>
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static,
    I: Iterator<Item = F>,
    I: Send + 'static,
{
    ops.par_bridge()
        .map(|f| f())
        .collect()
} 

The use of par_bridge() enforces that tasks are taken from the iterator in the order that the iterator provides them. This is similar to the kind of orderings you get with a channel.

If you really don't want to use rayon for this, then you might be able to use another thread pool. E.g., I'm sure we could abuse Tokio to do it, though it would be unusual to use Tokio in that manner.

1 Like

We looked into ParallelBridge. The problem is that it does not do any buffering of the items. Our iterator has a somewhat I/O heavy item generation, and without a buffer we lose a lot of time in contention.

Just to give an idea, ParallelBridge is about 4 times slower than our current solution (which uses a small buffer) on a no-op task.

I apologize—my original post was missing a lot of information that I condensed in "no rayon", but I should have explained in more detail.

This is true, but unlike Rayon’s “indexed” parallel iterators, par_bridge() does not preserve the original order of items. Your code sample’s collect() will see the final elements in a nondeterministic order, whereas @vigna's requirements (if I understand them correctly) are that the items are folded in the original order, not the order they were processed.

This is a tricky problem and I’m not aware of any libraries that directly solve it. I once wrote code to try to recover ordering from an output channel but it is flawed — I think that in order to truly avoid potentially unbounded buffering when a particular item takes a long time, you have to stall other jobs and not let them occupy output buffer capacity until the item you need arrives.

The fold order can be arbitrary—we have associative, commutative folding functions. What you suggest can be done with the pariter crate though.

Very nice library, but it reads from the source iterator using a lock, much like ParallelBridge, and that is what we do not want to do for the reasons I explained above.

You can use an mpsc or a crossbeam receiver as your iterator. That'll let you have buffering.

That's the solution I described in the original post, and indeed what we're using now.

I was just trying to find a way to delete some code :roll_eyes:.