How to convert to multi-threaded version?

I'm dealing with code, that is fairly easy to write with recursion and a single thread in mind. I don't think recursion will be a problem for this algorithm, because I expect it to not be able to go very deep due to the exponential branching factor involved.

What is causing me trouble is the conversion to multi-threaded code. In my first attempt, I simply copied the first level of the algorithm and used rayon to turn the code into a multi-threaded version. The result wasn't very satisfying, because the number of cores don't always match the number of generated states, i.e. some cores stay idle at the end of the iteration when only few branches remain to be calculated at the root level.

The solution I'm searching for has to parallelize at more than just the root level of the recursion, whenever necessary. As it turns out, that's quite a challenge and I haven't found any helpful resources, yet.

Any kind of help is welcome. Thanks in advance!

use rand::thread_rng;
use rand::Rng;

fn main() {
    println!("{:?}", calculate(OpaqueState::default(), 3));
}

fn calculate(state: OpaqueState, depth: u8) -> OpaqueResult {
    if 0 == depth {
        OpaqueResult::one()
    } else {
        opaque_op(opaque_iter(state).map(|state| calculate(state, depth - 1)))
    }
}

fn opaque_iter(_state: OpaqueState) -> impl Iterator<Item = OpaqueState> {
    (0..thread_rng().gen_range(0, 20)).map(|_| OpaqueState::default())
}

#[derive(Default, Debug)]
struct OpaqueResult(u8);

impl OpaqueResult {
    fn one() -> Self {
        Self(1)
    }
}

fn opaque_op(t: impl Iterator<Item = OpaqueResult>) -> OpaqueResult {
    t.fold(OpaqueResult::default(), |acc, item| {
        OpaqueResult(acc.0 + item.0)
    })
}

#[derive(Default)]
struct OpaqueState(());

(Playground)

Output:

OpaqueResult(255)

Where is the use of rayon? Generally, you are probably looking for the join function from rayon.

1 Like

Now, that I've seen join, I'm wondering, if the ParallelIterator methods also behave like that or not. If they do, then I guess I could write the code like this:

use rand::thread_rng;
use rand::Rng;
use rayon::prelude::*;

fn main() {
    println!("{:?}", calculate(OpaqueState::default(), 3));
}

fn calculate(state: OpaqueState, depth: u8) -> OpaqueResult {
    if 0 == depth {
        OpaqueResult::one()
    } else {
        opaque_op(
            opaque_iter(state)
                .par_bridge()
                .into_par_iter()
                .map(|state| calculate(state, depth - 1)),
        )
    }
}

fn opaque_iter(_state: OpaqueState) -> impl Iterator<Item = OpaqueState> {
    (0..thread_rng().gen_range(0, 20)).map(|_| OpaqueState::default())
}

#[derive(Default, Debug)]
struct OpaqueResult(u8);

impl OpaqueResult {
    fn one() -> Self {
        Self(1)
    }
}

fn opaque_op(t: impl ParallelIterator<Item = OpaqueResult>) -> OpaqueResult {
    t.reduce(
        || OpaqueResult::default(),
        |acc, item| OpaqueResult(acc.0 + item.0),
    )
}

#[derive(Default)]
struct OpaqueState(());

(Playground)

If your question is whether you can use a parallel iterator inside a parallel iterator, the answer is yes.

1 Like

Thanks! That was very insightful. I didn't expect rayon to be that powerful. What a pleasant surprise!

1 Like

In fact, parallel iterators are literally implemented with nested joins.