Acquiring a per-thread mutex in rayon kills parallelism

Hello,

The following snippet (playground) creates a rayon threadpool, a vector with per-thread data behind mutexes, and processes data by chunks.

use rayon::prelude::*;
use std::sync::{Arc, Mutex};
fn main() -> anyhow::Result<()> {
    // Create pool
    let pool_size = 10;
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(pool_size)
        .build()
        .unwrap();

    // Per-thread data behind a mutex
    let per_thread: Vec<Arc<Mutex<()>>> = vec![Default::default(); pool_size];

    let data = vec![10.0; 100];
    pool.install(|| {
        data.into_par_iter()
            .chunks(100 / pool_size)
            .map(|_x| {
                let thread = rayon::current_thread_index().unwrap();
                println!("start in {}", thread);
                let data = per_thread[thread].lock();
                std::thread::sleep(std::time::Duration::from_secs(1));
                println!("done in {}", thread);
            })
            .collect::<Vec<_>>();
    });

    Ok(())
}

The 10 tasks start simultaneously in different threads as expected:

start in 0
start in 7
start in 1
start in 2
start in 9
start in 5
start in 3
start in 8
start in 4
start in 6

However, they execute sequentially!

done in 1
[1 second]
done in 5
[1 second]
...

Removing the .lock() call restores parallel execution. Replacing lock by try_lock panics. This should not be the case as each thread should acquire a different mutex...

The same happens with the simpler example

vec![vec![1, 2, 3], vec![4, 5, 6]].into_par_iter().map(|_x| { ... })

On the other hand, doing pool.spawn manually executes the tasks in parallel.

I feel like I am missing something obvious in either Rayon's model, or in the behaviour of mutexes... I know that rayon performs work stealing, but I assume a function should not move to another thread once it has started executing.

Does anyone see an explanation?

This initializes per_thread with a bunch of Arc clones sharing the same mutex. Initialize it with something like (0..pool_size).map(|_| Default::default()).collect(); instead if you want a new mutex for each thread.

4 Likes

Thanks, I missed the obvious... That trap is even described in the documentation:

This will use clone to duplicate an expression, so one should be careful using this with types having a nonstandard Clone implementation. For example, vec![Rc::new(1); 5] will create a vector of five references to the same boxed integer value, not five references pointing to independently boxed integers.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.