Hey, guys.
I have a service which has multiple async tasks running concurrently using tokio.
the main task takes in events and runs some common logic, which transforms it into a vec that will be processed.
In that task I hold multiple executors (with state) which get that vec and process it outputing some events. Those executors are totally cpu-bound workloads, they don't require anything but their state and the input data, and they are compute heavy.
I would like to run them in parallel, as each Executor takes the same data and does something different to it and spits out some sort of event.
here is the pseudo code
struct Executor1 {
state: ExecutorState
}
impl Executor {
fn run(&mut self, inputs: Vec<Input>) {
// cpu-workload
}
}
// more executors
...
/// this processor runs forever in a tokio::spawn
struct Processor {
...
executor_1: Executor1,
executor_2: Executor2,
...
}
impl Processor {
// this function runs when processor receives input
// which is every frame
async fn process_input(&mut self, new_data: Data) {
//... do some common stuff to data to create a vec of inputs...
// run executors in parallel
}
}
My problem is that to run the executors I need a mut ref to them and also they need a reference to the input.
The easiest solution is to wrap the Executors in Arc<Mutex<>>
and pass it to a rayon::spawn
but at a given time there is only 1 of each executor executing and so the mutex is kind of redundant as the lock will always be unlocked.
I read @alice's great post on how to run cpu-bound workloads in tokio (Async: What is blocking? – Alice Ryhl)
In the article @alice only uses rayon::spawn
with no given state and returns the output with a oneshot channel.
In this post @alice mentions a couple of options to solve a related problem. I was wondering how to go about choosing the best option in my scenerio.
I thought about using rayon::scope but as @alice mentions in the post it blocks the current task and the tokio runtime which is very bad. I thought maybe using block_in_place
will solve the problem, but i don't think i can pass it a reference to self.
impl Processor {
// this function runs when processor receives input
// which is every frame
async fn process_input(&mut self, new_data: Data) {
//... do some common stuff to data to create a vec of inputs...
let executor_1_output = Vec::new();
let executor_2_output = Vec::new();
// run executors in parallel
tokio::task::block_in_place(|| {
rayon::spawn(|| {
executor_1_output.extend(self.executor_1.run(inputs));
});
rayon::spawn(|| {
executor_2_output.extend(self.executor_2.run(inputs));
});
});
}
}
Again my question is what do you guys think is the best option in this situation considering:
- only 1 executor (of the same type) runs at a given time, which means
Mutex
is just overhead (or is it if is always not locked?) - i don't want to clone my input as it can be quite heavy (can be a good amount of data)
- this happens on every input (which happens every "frame") so i want the least amount of overhead.