How to use rayon ThreadPool in a sync way?

I have a set of small tasks, split them into chunks, and want to run a function for every chunk in ThreadPool, and be able to wait till all of them end (or panic).

But ThreadPool::install works in sync manner. It looks like even if I nest it inside another install call, everything will still be fully sync.

(I thought of just calling spawn and then monitoring a flag variable. But maybe, there's a better way?)

Here's the snippet:

let results = Arc<Mutex<Vec<usize>>> = Arc::new(Mutex::new(vec![]));
let mut tpool = ThreadPoolBuilder::new().build().unwrap();
for (i, items) in item_chunks.into_iter().enumerate() {
    tpool.install(move || {
        println!("thread {i} started");
        sleep(Duration::from_secs(3));
        println!("thread {i} waited 3 secs");
        results.lock().unwrap().push(i);
    });
}

The output:

thread 0 started
thread 0 waited 3 secs
thread 1 started
thread 1 waited 3 secs
thread 2 started
thread 2 waited 3 secs
thread 3 started
thread 3 waited 3 secs

I want instead them to run in parallel, and then somehow to await/join ThreadPool, and have a guarantee all the tasks are done.

install() is not the operation you want. To create actual parallelism, you need to establish a scope() and then spawn() or join() within the scope. Also, you don't need Arc since scopes can borrow.

let results: &Mutex<Vec<usize>> = &Mutex::new(vec![]);
let mut tpool = ThreadPoolBuilder::new().build().unwrap();
tpool.scope(|scope| {
    for (i, items) in item_chunks.into_iter().enumerate() {
        scope.spawn(move || {
            println!("thread {i} started");
            sleep(Duration::from_secs(3));
            println!("thread {i} waited 3 secs");
            results.lock().unwrap().push(i);
        });
    }
});

The important part of this version of the code is that there is one call to scope() that contains all of the things you want to execute in parallel, and spawn() contains each of the things.

However, there is further improvement to be made. If you are not customizing the thread pool, then you do not need to create one, and should make use of Rayon’s automatic creation of a default thread pool.

let results: &Mutex<Vec<usize>> = &Mutex::new(vec![]);
rayon::scope(|scope| {
    for (i, items) in item_chunks.into_iter().enumerate() {
        scope.spawn(move || {
            println!("thread {i} started");
            sleep(Duration::from_secs(3));
            println!("thread {i} waited 3 secs");
            results.lock().unwrap().push(i);
        });
    }
});

The reason you should do this is that if your program contains multiple of these parallel operations that end up happening in parallel themselves, it would be less efficient to create multiple thread pools than to use one properly-sized thread pool.

Also, if possible for your real application, you should use Rayon parallel iterators instead of a Mutex:

let results: Vec<usize> =
    item_chunks.into_par_iter()
        .enumerate()
        .map(move |(i, items)| {
            println!("chunk {i} started");
            sleep(Duration::from_secs(3));
            println!("chunk {i} waited 3 secs");
            i
        })
        .collect();

The advantage of this approach is that there is no contention for the mutex, because there is no mutex.

3 Likes

Thank you very much! This makes sense. I knew all the bits, but didn't understand how to use them together.

Actually, a good idea with par_iter. I had rejected it earlier, because the output has to be collected all together at once. But I missed that you can use it to launch tasks.