Running cpu-bound workload with mutable state

Hey, guys.
I have a service which has multiple async tasks running concurrently using tokio.
the main task takes in events and runs some common logic, which transforms it into a vec that will be processed.

In that task I hold multiple executors (with state) which get that vec and process it outputing some events. Those executors are totally cpu-bound workloads, they don't require anything but their state and the input data, and they are compute heavy.

I would like to run them in parallel, as each Executor takes the same data and does something different to it and spits out some sort of event.

here is the pseudo code

struct Executor1 {
     state: ExecutorState
   
}

impl Executor {

   fn run(&mut self, inputs: Vec<Input>) {
      // cpu-workload
   }
}

// more executors
...

/// this processor runs forever in a tokio::spawn 
struct Processor {
    ...
    executor_1: Executor1,
    executor_2: Executor2,
    ...
}

impl Processor {

     // this function runs when processor receives input
     // which is every frame
     async fn process_input(&mut self, new_data: Data) {
      //... do some common stuff to data to create a vec of inputs... 

     // run executors in parallel
     }
}

My problem is that to run the executors I need a mut ref to them and also they need a reference to the input.

The easiest solution is to wrap the Executors in Arc<Mutex<>> and pass it to a rayon::spawn but at a given time there is only 1 of each executor executing and so the mutex is kind of redundant as the lock will always be unlocked.

I read @alice's great post on how to run cpu-bound workloads in tokio (Async: What is blocking? – Alice Ryhl)
In the article @alice only uses rayon::spawn with no given state and returns the output with a oneshot channel.

In this post @alice mentions a couple of options to solve a related problem. I was wondering how to go about choosing the best option in my scenerio.

I thought about using rayon::scope but as @alice mentions in the post it blocks the current task and the tokio runtime which is very bad. I thought maybe using block_in_place will solve the problem, but i don't think i can pass it a reference to self.

impl Processor {

     // this function runs when processor receives input
     // which is every frame
     async fn process_input(&mut self, new_data: Data) {
      //... do some common stuff to data to create a vec of inputs... 

     let executor_1_output = Vec::new();
     let executor_2_output = Vec::new();

     // run executors in parallel
     tokio::task::block_in_place(|| {
            rayon::spawn(|| {
                executor_1_output.extend(self.executor_1.run(inputs));
            });
            rayon::spawn(|| {
                  executor_2_output.extend(self.executor_2.run(inputs));
            });
        });
     }
}

Again my question is what do you guys think is the best option in this situation considering:

  1. only 1 executor (of the same type) runs at a given time, which means Mutex is just overhead (or is it if is always not locked?)
  2. i don't want to clone my input as it can be quite heavy (can be a good amount of data)
  3. this happens on every input (which happens every "frame") so i want the least amount of overhead.
1 Like

In your situation, I would use a bounded MPSC channel to send work, and a thread that runs all the time. Something like:

struct Executor1 {
    state: ExecutorState,
}

impl Executor1 {
    fn run_once(&mut self, inputs: Vec<Input>) -> Output {
        // cpu-workload
    }
}

fn spawn_executor1(queue_length: usize, state: ExecutorState) -> (mpsc::Sender<(Vec<Input>, oneshot::Sender<Output>)>, std::thread::JoinHandle<()> {
    let (tx, rx) = mpsc::channel(queue_length);
    let executor1 = Executor1 { state };

    let handle = std::thread::spawn(move ||    
        while Some((inputs, response)) = rx.blocking_recv() {
            response.send(executor1.run_once(inputs));
        }
    );

    (tx, handle)
}

Then, you can send work to the executor, and you'll be blocked if it's busy (backpressure), and get the response via the oneshot channel when it's not busy.

2 Likes

yes this would work, but its kind of what i want to avoid. opening a task for basically running a function and also this requires copying the whole input

i am wondering if maybe all i need to do is spawn Processor as spawn_blocking
which lets the other tasks run and doesn't block the tokio runtime and then i can use rayon::scope with rayon::spawn, what do you guys think

You can call rayon::scope() inside block_in_place() and thus have the use of captured variables.

However, as per Rayon's documentation, you should try to use rayon::join() or parallel iterators instead of spawn() to express your parallelism more efficiently, at which point you don't need the scope.

2 Likes

can i call rayon::scope from inside block_in_place? doesn't everything in block_in_place require move? I think im missing something.

the problem with rayon::join is it only allows 2 closures and i have multiple (static amount) executors, does recursing here make sense?

and do you think that in terms of performance (or even overall correctness) its better to block_in_place each time rather than once on the Processor when we run tokio::spawn?

It requires copying the Vec, which is 3 pointers in size, but not the whole input (since the Vec points to stuff on the heap). It also requires copying Output, but you can reduce the cost of that by making it Box<T> instead of T.

No. That would only be true if block_in_place had a + 'static bound on the provided function, which it does not. And if that were true, then block_in_place would not have very much reason to exist, since it could only do things that spawn_blocking can also do.

It makes sense to use multiple nested join() calls, yes. Each one should try to split the workload roughly in half.

The big difference here is what is provided to the caller — whether it works as a simple &mut self method and consumes no resources when idle, or whether there is a long-running thread waiting for work. Either one might be better for performance, depending on how the Processor is used.

1 Like

thanks so much for the help!
this is my current pesudo code for anyone wondering.

tokio::task::block_in_place(|| {
            let events = OutputEvents::new(); // holds x vecs
            rayon::scope(|s| {
                s.spawn(|_| {
                    events.executor1_output.extend(self.executor_1.run(inputs));
                });
                s.spawn(|_| {
                   events.executor2_output.extend(self.executor_2.run(inputs));
                });
            });

            if let Err(e) = tx.send(events) {
                tracing::error!(e=?e, "failed to send observer events");
            }
        });

will switch to using join after we profile

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.