What is the expected behavior of `for_each_init` with `par_bridge` in Rayon?

I am experimenting with Rayon’s for_each_init and I noticed some behavior that I want to clarify.

I have a Vec that I iterate using .iter().par_bridge() (which generates an IterBridge) and then call for_each_init. In the init closure I return a Vec<usize>. I observed that:

  • While there is data to process, each worker executes init once and reuses that buffer to process items.

  • After all data is processed, the init function may be called one or two more times, but the operation closure (op) is not called for these extra invocations.

Example:

use std::{thread, time::Duration};
use rayon::iter::{ParallelBridge, ParallelIterator};

fn main() {
    let v: Vec<usize> = (1..=100).collect();

    rayon::ThreadPoolBuilder::new()
        .num_threads(4)
        .build_global()
        .expect("Error creating thread pool");

    v.iter().par_bridge()
        .for_each_init(
            || {
                println!("Get Vector from worker: {:?}", rayon::current_thread_index());
                Vec::with_capacity(25)
            }, 
            |buffer, n| {
                buffer.push(n);
                if buffer.len() == 25 {
                    println!("Worker: {:?}, Buffer: {:?}", rayon::current_thread_index(), buffer.clone());
                }
                thread::sleep(Duration::from_millis(100));
            }
        );
}

Common output:

Get Vector from worker: Some(2)
Get Vector from worker: Some(0)
Get Vector from worker: Some(1)
Get Vector from worker: Some(3)
Worker: Some(1), Buffer: [...]
Worker: Some(3), Buffer: [...]
Worker: Some(0), Buffer: [...]
Worker: Some(2), Buffer: [...]
Get Vector from worker: Some(1)

However, when I use .par_iter().for_each_init() I notice that init is executed multiple times per worker during processing.

So, my question is: Does for_each_init with par_bridge guarantee that each worker executes the init closure at most once while processing all data, or is it possible that it may be called multiple times per worker, similar to .par_iter().for_each_init()?

The answer to your question is clearly no. Thread 1 calls it twice in your example.

I found this discussion: `for_each_init` calls `init` several times per thread · Issue #742 · rayon-rs/rayon · GitHub

It sounds like init is called every time a job is split, which means:

  1. When a thread runs out of work and splits another thread's empty queue, it still runs init even though there's nothing to use it on.
  2. The init value isn't shared between different jobs on the same thread.

It says to use with_min_len (in this case, with a value of 25). You could also use par_chunks and deal with slices of items directly. But these remove rayon's ability to give work to threads that finish early, so if your jobs don't all take the same amount of time, you may want to avoid for_each_init and use a thread local RefCell.

Thanks for your response!

Just to clarify my question: I understand that init can be called multiple times overall due to job splitting. With par_iter*().for_each_init(), it seems likely that each thread will call the init closure multiple times even while processing items.

However, with par_bridge().for_each_init(), in my experiments each thread only seems to call init once while processing data, and any additional calls only happen after all items have already been processed (and without executing the op closure).

So my question is specifically: when using par_bridge().for_each_init(), is it expected that each thread will execute the init closure at most once while processing items, or is that just incidental behavior that could change?

I believe the reason is that when using par_bridge(), theres no job-splitting.
Every thread will lock() one shared mutex, and then call next() on your iterator.

Heres the code for it par_bridge.rs

Thanks! I dived into the source code and I can now clarify some things.

First, you are right: when using par_bridge, each worker locks the shared mutex and repeatedly calls next() on the iterator until all items are processed.
But before that, internally there is some job-splitting logic happening, and it’s not completely “no splitting.”

How splitting works in par_bridge

When consuming an IterBridge, Rayon calls drive_unindexed, which executes bridge_unindexed.
This in turn calls bridge_unindexed_producer_consumer
with a Splitter responsible for splitting jobs among workers.

  • Splitter: can split jobs up to log(num_threads) + 1 times per worker.

  • IterParallelProducer: can only split itself up to num_threads + 1 times.

That means if a worker steals a job and executes a fresh bridge_unindexed_producer_consumer with a new Splitter, it might attempt to split again. But if the IterParallelProducer has already reached its split limit, that worker will immediately start consuming the iterator instead of further splitting.

Example: Execution table with 4 workers

Considerations

  • If Splitter.split > 0: the worker will try to split jobs.

  • If IterParallelProducer.split_count > 0: the worker splits by calling bridge_unindexed_producer_consumer twice
    (one call may go into the StackJob for stealing).

  • If IterParallelProducer.split_count <= 0 || Splitter.split <= 0: the worker starts consuming with
    producer.fold_with(consumer.into_folder()).complete().

  • When a worker steals a job, its Splitter.split count resets to num_threads on the first call to bridge_unindexed_producer_consumer.

  • I’ve omitted certain details for simplicity.

time Splitter.split IterParallelProducer.split_count Splitter.split > 0 IterParallelProducer.split_count > 0 Observation
t1: w1 w1: 4 4 true true Worker 1 calls bridge twice, Worker 2 steals the second call
t2: w1 w1: 2, w2: 4 3 true true Worker 1 calls bridge twice, Worker 3 steals the second call
t3: w2 w1: 1, w2: 4, w3: 4 2 true true Worker 2 calls bridge twice, Worker 4 steals the second call
t4: w1 w1: 1, w2: 2, w3: 4, w4: 4 1 true true Worker 1 calls bridge twice but no worker steals the job
t5: w1 w1: 0, w2: 2, w3: 4, w4: 4 0 false --- Worker 1 is ready to consume the iterator
t6: w2 w1: 0, w2: 2, w3: 4, w4: 4 0 true false Producer no longer splits. Worker 2 starts consuming the iterator
t7: w3 w1: 0, w2: 1, w3: 4, w4: 4 0 true false Producer no longer splits. Worker 3 starts consuming the iterator
t8: w4 w1: 0, w2: 1, w3: 2, w4: 4 0 true false Producer no longer splits. Worker 4 starts consuming the iterator
t9: wX w1: 0, w2: 1, w3: 2, w4: 2 0 false --- Extra splitting attempt runs but does nothing (tried to consume the iterator, but the worker was already marked as started)

Visual representation (simplified):

                                                           w1: bridge() 
                                                         /              \
                                                    /                        \
                                             /                                     \
                                 w1: bridge()                                       w2: bridge()
                              /               \                                    /             \
                  w1: bridge()                 w3: bridge()           w2: bridge()                 w4: bridge()
                    /      \                        |                      |                            |
         w1: bridge()      w1: bridge()        try consume            try consume                  try consume
              |                 |                   |                      |                            |
         try consume       try consume          consuming              consuming                    consuming
              |                 |
          consuming             X

Answer to my original doubt

When using par_bridge().for_each_init(), is it expected that each thread will execute the init closure at most once while processing items, or is that just incidental behavior that could change?

The behavior is expected:

  • Each worker that actually starts consuming the iterator executes the init function exactly once, before entering producer.fold_with(consumer.into_folder())
    and beginning to consume the iterator (link).

  • After that, the worker continues consuming items until the iterator is exhausted (link).

  • If the same worker later re-executes producer.fold_with(consumer.into_folder()).complete(), the init function will be called again, but inside
    producer.fold_with the thread will be marked as already started, so it will return immediately without entering the loop
    (link).

  • Finally, producer.fold_with(consumer.into_folder()).complete() will be called num_threads + 1 times, so it’s expected that at least one thread
    will call the init function after all data has been consumed.

I hope this helps someone who has the same doubt I had. I tried to be as brief as possible and focus on the main function responsible for this behavior.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.