Scoped threads do not seem to be running in parallel

I have a function that takes in a closure to parallelize process over a list of arguments using it. I already have a working implementation using rayon but now I'm trying to implement it using std lib scoped threads. The current implementation is below:

fn run_with_output<T, U, F>(f: F, input_list: &[&T], num_threads: usize) -> Vec<U> 
where
    T: ThreadSafe,
    U: ThreadSafe,
    F: Fn(&T) -> U + ThreadSafe,
{
    let mut result: Vec<_> = Vec::with_capacity(input_list.len());
    let chunks = input_list.chunks(num_threads);
    thread::scope(|sc| {
        let mut handles: Vec<_> = Vec::with_capacity(num_threads);
        handles.extend(chunks.into_iter().map(|chunk| {
            sc.spawn(|| {
                let result: Vec<_> = chunk.iter().map(|x| f(x)).collect();
                result
            })
        }));

        for handle in handles {
            result.extend(handle.join().expect("Thread being joined has panicked!").into_iter());
        }
    });
    result
}

I then tested it using a closure that sleeps:

#[test]
fn ShouldRunInParallel_RunWithOutput() {
    let sleep_time: Duration = Duration::new(0, 500000000);
    let f = |x: &i32| {thread::sleep(sleep_time); x*3};
    let inputs = [&1, &2, &3];

    let start = Instant::now();
    let results = run_with_output(f, &inputs, 3);
    let runtime = start.elapsed().as_millis();
    println!("{}", runtime);
    let answers = [3, 6, 9];

    assert_eq!(results, answers.to_vec());
    assert!(runtime < 600);
}

The output is correct however the runtime seems to indicate the execution happened in serial as opposed to parallel (a bit over 1500 ms). Is there something obvious I have missed? Thanks!

On second thought it's probably due to the sleep referencing the main thread as opposed to the actual forked thread. I'll test it out.

Edit: Yep that's the cause. What would be a relatively simple way to modify the test in order to inject the sleep into the correct child threads?

Note that slice.chunks(N) divides into chunks of length N, with as many as needed to complete the full slice length. I think you wanted something more like slice.chunks(slice.len() / N)?

2 Likes

Ouch good catch!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.