bux
May 20, 2024, 8:29am
1
Hi community !
Can you advise me on how to have a good approach about make a looped massive parallel computing with a shared state ?
The basic idea is to do in each loop iteration:
Start or use thread pool to execute multiple computation with a read-only state access
Collect computations results and modify state according to
There is a non-working code example to illustrate the idea :
use std::{ sync::mpsc::{channel, Receiver, Sender}; thread };
struct State;
impl State {
fn apply(&mut self, messages: Vec<Message>) {
// [...]
}
}
enum Message {}
fn job1(state: &State, results_sender: Sender<Message>) {
// [...]
}
fn job2(state: &State, results_sender: Sender<Message>) {
// [...]
}
fn job3(state: &State, results_sender: Sender<Message>) {
// [...]
}
fn main() {
let mut state = State;
let (results_sender, results_receiver): (Sender<Message>, Receiver<Message>) = channel();
let results_sender1 = results_sender.clone();
let results_sender2 = results_sender.clone();
let results_sender3 = results_sender.clone();
loop {
thread::spawn(|| job1(&state, results_sender1));
thread::spawn(|| job2(&state, results_sender2));
thread::spawn(|| job3(&state, results_sender3));
state.apply(results_receiver.try_iter().collect());
}
}
(this example do not illustrate thread end wait mechanism before the state.apply
)
I used Rc
, Mutex
and other things, but I'm not sure about how to do this properly.
Thanks for your time !
So, this was basically how I structured a simulation framework I worked on previously. At its most basic, the loop consisted of:
fn update(state: &mut State) {
let (update_a, update_b, ..) = thread::scope(|s| {
let update_a = s.spawn(|| sub_mod_a::update(&*state));
let update_b = s.spawn(|| sub_mod_b::update(&*state));
..
(update_a, update_b)
});
update_a.apply(&mut *state);
update_b.apply(&mut *state);
..
}
No need for Rc
or anything else. Just using scoped threads to run the updates with immutable borrows of the state. Each sub-module would return an opaque value (later a trait object) with an apply
method, which were then called in a fixed serialized order to apply the changes. The only place I used locks was for a few bits and pieces that couldn't be efficiently done otherwise, and did not contribute to the final result (things like debug flags, logs, etc.).
Worked great. Only regret I had about the system was that I actually wish I'd used Arc
and immutable data structures for the whole thing so I could cheaply clone the state. It turned out that, for some workloads, logging all the results took a sizeable proportion of the runtime, and that blocked the next update step.
3 Likes
bux
May 20, 2024, 9:38am
3
Hello @DanielKeep ,
Thanks for sharing this, I didn't used scoped threads and look like very useful !
I wrote a working code:
use std::{thread, time::Duration};
#[derive(Debug)]
struct State { values: [i32; 2] }
struct JobResult { i: usize, value: i32 }
impl JobResult {
fn apply(&self, state: &mut State) {
state.values[self.i] = self.value;
}
}
fn job_a(state: &State) -> JobResult {
JobResult { i: 0, value: state.values[0] * 2 }
}
fn job_b(state: &State) -> JobResult {
JobResult { i: 1, value: state.values[1] * 2 }
}
fn main() {
let mut state = State { values: [1, 1] };
loop {
let (update_a, update_b) = thread::scope(|scope| {
let update_a = scope.spawn(|| job_a(&state));
let update_b = scope.spawn(|| job_b(&state));
(update_a.join(), update_b.join())
});
update_a.unwrap().apply(&mut state);
update_b.unwrap().apply(&mut state);
dbg!(&state);
thread::sleep(Duration::from_millis(500));
}
}
Work great. Difference with your example is the deref
. How do you use it ?
That was just to show where I was using immutable and mutable borrows. They're probably not actually necessary. I wasn't trying to write directly compilable code, I just wanted to give you the jist.
system
Closed
August 18, 2024, 9:54am
6
This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.