Acquiring a per-thread mutex in rayon kills parallelism

Hello,

The following snippet (playground) creates a rayon threadpool, a vector with per-thread data behind mutexes, and processes data by chunks.

use rayon::prelude::*;
use std::sync::{Arc, Mutex};
fn main() -> anyhow::Result<()> {
    // Create pool
    let pool_size = 10;
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(pool_size)
        .build()
        .unwrap();

    // Per-thread data behind a mutex
    let per_thread: Vec<Arc<Mutex<()>>> = vec![Default::default(); pool_size];

    let data = vec![10.0; 100];
    pool.install(|| {
        data.into_par_iter()
            .chunks(100 / pool_size)
            .map(|_x| {
                let thread = rayon::current_thread_index().unwrap();
                println!("start in {}", thread);
                let data = per_thread[thread].lock();
                std::thread::sleep(std::time::Duration::from_secs(1));
                println!("done in {}", thread);
            })
            .collect::<Vec<_>>();
    });

    Ok(())
}

The 10 tasks start simultaneously in different threads as expected:

start in 0
start in 7
start in 1
start in 2
start in 9
start in 5
start in 3
start in 8
start in 4
start in 6

However, they execute sequentially!

done in 1
[1 second]
done in 5
[1 second]
...

Removing the .lock() call restores parallel execution. Replacing lock by try_lock panics. This should not be the case as each thread should acquire a different mutex...

The same happens with the simpler example

vec![vec![1, 2, 3], vec![4, 5, 6]].into_par_iter().map(|_x| { ... })

On the other hand, doing pool.spawn manually executes the tasks in parallel.

I feel like I am missing something obvious in either Rayon's model, or in the behaviour of mutexes... I know that rayon performs work stealing, but I assume a function should not move to another thread once it has started executing.

Does anyone see an explanation?

This initializes per_thread with a bunch of Arc clones sharing the same mutex. Initialize it with something like (0..pool_size).map(|_| Default::default()).collect(); instead if you want a new mutex for each thread.

4 Likes

Thanks, I missed the obvious... That trap is even described in the documentation:

This will use clone to duplicate an expression, so one should be careful using this with types having a nonstandard Clone implementation. For example, vec![Rc::new(1); 5] will create a vector of five references to the same boxed integer value, not five references pointing to independently boxed integers.