Massive parallel computing with shared state

Hi community !

Can you advise me on how to have a good approach about make a looped massive parallel computing with a shared state ?

The basic idea is to do in each loop iteration:

  1. Start or use thread pool to execute multiple computation with a read-only state access
  2. Collect computations results and modify state according to

There is a non-working code example to illustrate the idea :

use std::{ sync::mpsc::{channel, Receiver, Sender}; thread };

struct State;
impl State {
    fn apply(&mut self, messages: Vec<Message>) {
        // [...]
    }
}

enum Message {}

fn job1(state: &State, results_sender: Sender<Message>) {
    // [...]
}
fn job2(state: &State, results_sender: Sender<Message>) {
    // [...]
}
fn job3(state: &State, results_sender: Sender<Message>) {
    // [...]
}

fn main() {
    let mut state = State;
    let (results_sender, results_receiver): (Sender<Message>, Receiver<Message>) = channel();
    let results_sender1 = results_sender.clone();
    let results_sender2 = results_sender.clone();
    let results_sender3 = results_sender.clone();
    loop {
        thread::spawn(|| job1(&state, results_sender1));
        thread::spawn(|| job2(&state, results_sender2));
        thread::spawn(|| job3(&state, results_sender3));
        state.apply(results_receiver.try_iter().collect());
    }
}

(this example do not illustrate thread end wait mechanism before the state.apply)

I used Rc, Mutex and other things, but I'm not sure about how to do this properly.

Thanks for your time !

So, this was basically how I structured a simulation framework I worked on previously. At its most basic, the loop consisted of:

fn update(state: &mut State) {
    let (update_a, update_b, ..) = thread::scope(|s| {
        let update_a = s.spawn(|| sub_mod_a::update(&*state));
        let update_b = s.spawn(|| sub_mod_b::update(&*state));
        ..
        (update_a, update_b)
    });
    update_a.apply(&mut *state);
    update_b.apply(&mut *state);
    ..
}

No need for Rc or anything else. Just using scoped threads to run the updates with immutable borrows of the state. Each sub-module would return an opaque value (later a trait object) with an apply method, which were then called in a fixed serialized order to apply the changes. The only place I used locks was for a few bits and pieces that couldn't be efficiently done otherwise, and did not contribute to the final result (things like debug flags, logs, etc.).

Worked great. Only regret I had about the system was that I actually wish I'd used Arc and immutable data structures for the whole thing so I could cheaply clone the state. It turned out that, for some workloads, logging all the results took a sizeable proportion of the runtime, and that blocked the next update step.

3 Likes

Hello @DanielKeep,

Thanks for sharing this, I didn't used scoped threads and look like very useful !

I wrote a working code:

use std::{thread, time::Duration};

#[derive(Debug)]
struct State { values: [i32; 2] }
struct JobResult { i: usize, value: i32 }
impl JobResult {
    fn apply(&self, state: &mut State) {
        state.values[self.i] = self.value;
    }
}

fn job_a(state: &State) -> JobResult {
    JobResult { i: 0, value: state.values[0] * 2 }
}
fn job_b(state: &State) -> JobResult {
    JobResult { i: 1, value: state.values[1] * 2 }
}
fn main() {
    let mut state = State { values: [1, 1] };

    loop {
        let (update_a, update_b) = thread::scope(|scope| {
            let update_a = scope.spawn(|| job_a(&state));
            let update_b = scope.spawn(|| job_b(&state));
            (update_a.join(), update_b.join())
        });

        update_a.unwrap().apply(&mut state);
        update_b.unwrap().apply(&mut state);

        dbg!(&state);
        thread::sleep(Duration::from_millis(500));
    }
}

Work great. Difference with your example is the deref. How do you use it ?

That was just to show where I was using immutable and mutable borrows. They're probably not actually necessary. I wasn't trying to write directly compilable code, I just wanted to give you the jist.

Ok thanks !

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.