I am experimenting with Rayon’s for_each_init and I noticed some behavior that I want to clarify.
I have a Vec that I iterate using .iter().par_bridge() (which generates an IterBridge) and then call for_each_init. In the init closure I return a Vec<usize>. I observed that:
While there is data to process, each worker executes init once and reuses that buffer to process items.
After all data is processed, the init function may be called one or two more times, but the operation closure (op) is not called for these extra invocations.
Get Vector from worker: Some(2)
Get Vector from worker: Some(0)
Get Vector from worker: Some(1)
Get Vector from worker: Some(3)
Worker: Some(1), Buffer: [...]
Worker: Some(3), Buffer: [...]
Worker: Some(0), Buffer: [...]
Worker: Some(2), Buffer: [...]
Get Vector from worker: Some(1)
However, when I use .par_iter().for_each_init() I notice that init is executed multiple times per worker during processing.
So, my question is: Does for_each_init with par_bridge guarantee that each worker executes the init closure at most once while processing all data, or is it possible that it may be called multiple times per worker, similar to .par_iter().for_each_init()?
It sounds like init is called every time a job is split, which means:
When a thread runs out of work and splits another thread's empty queue, it still runs init even though there's nothing to use it on.
The init value isn't shared between different jobs on the same thread.
It says to use with_min_len (in this case, with a value of 25). You could also use par_chunks and deal with slices of items directly. But these remove rayon's ability to give work to threads that finish early, so if your jobs don't all take the same amount of time, you may want to avoid for_each_init and use a thread local RefCell.
Just to clarify my question: I understand that init can be called multiple times overall due to job splitting. With par_iter*().for_each_init(), it seems likely that each thread will call the init closure multiple times even while processing items.
However, with par_bridge().for_each_init(), in my experiments each thread only seems to call init once while processing data, and any additional calls only happen after all items have already been processed (and without executing the op closure).
So my question is specifically: when using par_bridge().for_each_init(), is it expected that each thread will execute the init closure at most once while processing items, or is that just incidental behavior that could change?
I believe the reason is that when using par_bridge(), theres no job-splitting.
Every thread will lock() one shared mutex, and then call next() on your iterator.
Thanks! I dived into the source code and I can now clarify some things.
First, you are right: when using par_bridge, each worker locks the shared mutex and repeatedly calls next() on the iterator until all items are processed.
But before that, internally there is some job-splitting logic happening, and it’s not completely “no splitting.”
How splitting works in par_bridge
When consuming an IterBridge, Rayon calls drive_unindexed, which executes bridge_unindexed.
This in turn calls bridge_unindexed_producer_consumer
with a Splitter responsible for splitting jobs among workers.
Splitter: can split jobs up to log(num_threads) + 1 times per worker.
That means if a worker steals a job and executes a fresh bridge_unindexed_producer_consumer with a new Splitter, it might attempt to split again. But if the IterParallelProducer has already reached its split limit, that worker will immediately start consuming the iterator instead of further splitting.
Example: Execution table with 4 workers
Considerations
If Splitter.split > 0: the worker will try to split jobs.
If IterParallelProducer.split_count > 0: the worker splits by calling bridge_unindexed_producer_consumer twice
(one call may go into the StackJob for stealing).
When using par_bridge().for_each_init(), is it expected that each thread will execute the init closure at most once while processing items, or is that just incidental behavior that could change?
The behavior is expected:
Each worker that actually starts consuming the iterator executes the init function exactly once, before entering producer.fold_with(consumer.into_folder())
and beginning to consume the iterator (link).
After that, the worker continues consuming items until the iterator is exhausted (link).
If the same worker later re-executes producer.fold_with(consumer.into_folder()).complete(), the init function will be called again, but inside producer.fold_with the thread will be marked as already started, so it will return immediately without entering the loop
(link).
Finally, producer.fold_with(consumer.into_folder()).complete() will be called num_threads + 1 times, so it’s expected that at least one thread
will call the init function after all data has been consumed.
I hope this helps someone who has the same doubt I had. I tried to be as brief as possible and focus on the main function responsible for this behavior.