How to dispatch work to rayon::ThreadPool

I wrote a small program that reads in data and dispatches it (in chunks) to threads. Using a wrapper around Vec<Option<JoinHandle<()>>>, I was able to get a decent interface for the concurrent processing I wanted.

Now I want to try switching to an established crate to handle this, and rayon::ThreadPool seemed like a good fit.

However, unlike my hand-rolled solution, my attempt with rayon only runs on a single thread until finishing. Simplified:

let pool = rayon::ThreadPoolBuilder::new().num_threads(10).build().unwrap();
println!("{}", pool.current_num_threads()); // 10, correct

let output = Arc<Mutex<SharedData>>;

let mut finished = false;
while !finished {
    let output = output.clone();
    let input = String::new();
    finished = run_producer(&mut input);

    pool.install(move || {
        let results = process(&input);
        
        let mut output = output.lock().unwrap();
        output.insert(results);
    });
}

Maybe I'm misunderstanding what rayon::ThreadPool::install() does? I'm conceptually replacing std::thread::spawn() with it basically. Do I have to move the entire loop into a single closure which I pass to ::install(), and the thread pool will schedule work on nested spawns within it?

The install function is indeed not equivalent to spawn. It merely ensures that if you call spawn (and similar) inside the closure, the spawned task is spawned on the provided thread pool, and not some other thread pool somewhere else. The call to install is blocking.

I see. How would I refactor my code to utilize this? E.g.:

let pool = rayon::ThreadPoolBuilder::new().build().unwrap();

let output = Arc<Mutex<SharedData>>;

pool.install(|| {
    let mut finished = false;
    while !finished {
        let output = output.clone();
        let input = String::new();
        finished = write_to_input(&mut input);

        rayon::spawn(move || {
            let results = process(&input);
        
            let mut output = output.lock().unwrap();
            output.insert(results);
        });
    }
});

I tried several approaches, also:

pool.install(|| {
    rayon::scope(|s| {
        s.spawn(|| {
            … 
        });
    };
};

They resulted in more boilerplate and worse performance (may be related to the nature of the data flow). So I'm wondering if I'm doing something wrong.

A little more direct would be pool.scope(|s| ... s.spawn(|| ...)). You should make sure the pool has at least two threads though, otherwise the spawn calls will just queue up until the scope returns -- that closure is also running on the thread pool.

It's hard to guess at the performance from your pseudo-code, but you'll want the process computation to have a fair amount of weight for this to pay off. If it's pretty small, you could try batching several together per spawn, locking the map just once for the group as late as possible.

Thank you, starting with pool.scope(|s| s.spawn(move || …)) is much cleaner already!

I also realized that during my attempts to get the rayon thread pool working, I had simplified a little by switching from using persistent input buffers (fill, dispatch, refill, dispatch, …) to moving new buffers into each thread. That way well explain the performance hit.

In my hand-rolled approach, I used an index within the loop that allowed me to fill a buffer and access it within the next spawned thread (simplified):

let output: Arc<Mutex<SharedData>>;
let buffers: Arc<Vec<Mutex<String>>> …; // construct to size

loop {
    let output = output.clone();
    let buffers = buffers.clone();

    let thread_idx = next_thread_idx(); // 0, 1, …N, 0, 1…
    {
        let mut buffer = buffers[thread_idx].lock().unwrap();
        fill_buffer(&mut buffer);
    }
    
    thread::spawn(move || {
        let buffer = buffers[thread_idx].lock().unwrap();
        let result = process(&buffer);

        let mut output = output.lock().uwrap();
        output.insert(result);
    }
}

This was simple to write, and the round-robin (FIFO like) order guaranteed virtually no lock contention. For uniform data which took each thread a similar amount of time to process, this was quite efficient; but I noticed that the data wasn't so uniform after all.

Any idea how I might approach this with a rayon ThreadPool? I imagine one would ditch the numeric thread_idx approach. The main thread/loop could go over all buffers elements and Mutex::try_lock() (with a little sleep in between) until it can acquire a buffer to refill. The spawned threads hold their buffer locks during processing, so order of operations should be sound, no?

You could use rayon::current_thread_index() in a similar fashion, and I expect the locking will be completely uncontended. However, make sure that process doesn't make any further rayon calls that may trigger work-stealing, otherwise you can end up re-entering the mutex for a separate spawn while the first is stalled.

I tried rayon::current_thread_index() but I don't think it would work on the refill part. Within the loop, it always gave me the same index. The spawed threads had changing indices.

Oh, I missed that you want to generate some of that before the spawn. Then yeah, the thread index isn't correct, as that can't be predicted ahead of time.

Might still come in handy for the buffer reading part though, since a given buffer/index won't be contented by threads other than the main producer (in my example scenario). Just realized that.