Running two fields' functions in parallel

I have a struct that contains two structs with member functions:

pub struct Split <I, A: Clone, B, C, R: ProcessNode<I, A>, D: ProcessNode<A, B>, E: ProcessNode<A, C>> {
    _marker: PhantomData<(I, A, B, C)>,
    root_node: R,
    node1: D,
    node2: E
}

Where ProcessNode is just a trait saying there exists a function fn(A) -> B:

pub trait ProcessNode<A, B> {
    fn process(&mut self, input: A) -> B;
}

I also implement ProcessNode for Split, which should take in an input A and outputs (B, C). Currently my implementation looks like this:

impl <I, A: Clone, B, C, R: ProcessNode<I, A>, D: ProcessNode<A, B>, E: ProcessNode<A, C>>ProcessNode<I, (B, C)> for Split<I, A, B, C, R, D, E> {
    #[inline]
    fn process(&mut self, input: I) -> (B, C) {
        let intermediate = self.root_node.process(input);
        // Run both nodes on intermediate
        (self.node1.process(intermediate.clone()), self.node2.process(intermediate))
    }
}

but I want to run both nodes in parallel on separate threads. I tried this:

impl <I, A: Clone + Send, B: Send, C: Send, R: ProcessNode<I, A>, D: ProcessNode<A, B>, E: ProcessNode<A, C>>ProcessNode<I, (B, C)> for Split<I, A, B, C, R, D, E> {
    #[inline]
    fn process(&mut self, input: I) -> (B, C) {
        let intermediate1 = self.root_node.process(input);
        let intermediate2 = intermediate1.clone();
        // Run both nodes in parallel
        let out1 = thread::spawn(|| self.node1.process(intermediate1));
        let out2 = thread::spawn(|| self.node2.process(intermediate2));
        (out1.join().unwrap(), out2.join().unwrap())
    }
}

but this doesn't work and says everything must be sent through threads safely. I don't think that sounds right, is this the wrong way to do it? The problem then becomes that I need a &mut for node1 and a &mut for node2 on each thread. Is there a good way to do this without cloning?

You can do it with rayon or crossbeam.

The Send bounds don't mean that you need to clone anything, necessarily. Even to send a &mut T reference between threads, you'll need a T: Send bound. After adding the necessary Send bounds (which can't be avoided), you'll run into lifetime problems though. You'll need to use some kind of "scoped" spawn function that can handle non-'static data; the typical solution here is to use e.g. crossbeam::scope to spawn some actual OS-threads.

Or, if you want to use a fixed thread-pool (for CPU-bound tasks) you can also use API from rayon, or if it's IO-bound tasks, you can consider switching to async Rust.

Anyways, using the crossbeam solution can look like this:

impl<I, A, B, C, R, D, E> ProcessNode<I, (B, C)> for Split<I, A, B, C, R, D, E>
where
    A: Clone + Send,
    B: Send,
    C: Send,
    R: ProcessNode<I, A>,
    D: Send + ProcessNode<A, B>,
    E: Send + ProcessNode<A, C>,
{
    #[inline]
    fn process(&mut self, input: I) -> (B, C) {
        let intermediate1 = self.root_node.process(input);
        let intermediate2 = intermediate1.clone();
        // Run both nodes in parallel
        crossbeam::scope(|scope| {
            let out1 = scope.spawn(|_| self.node1.process(intermediate1));
            let out2 = scope.spawn(|_| self.node2.process(intermediate2));
            (out1.join().unwrap(), out2.join().unwrap())
        })
        .unwrap()
    }
}

or, if you want to save one thread

impl<I, A, B, C, R, D, E> ProcessNode<I, (B, C)> for Split<I, A, B, C, R, D, E>
where
    A: Clone + Send,
    B: Send,
    C: Send,
    R: ProcessNode<I, A>,
    D: Send + ProcessNode<A, B>,
    E: Send + ProcessNode<A, C>,
{
    #[inline]
    fn process(&mut self, input: I) -> (B, C) {
        let intermediate1 = self.root_node.process(input);
        let intermediate2 = intermediate1.clone();
        // Run both nodes in parallel
        crossbeam::scope(|scope| {
            let out1 = scope.spawn(|_| self.node1.process(intermediate1));
            let out2 = self.node2.process(intermediate2);
            (out1.join().unwrap(), out2)
        })
        .unwrap()
    }
}

in which case, you could also drop some of the Send bound, but that makes the API a bit asymmetrical:

impl<I, A, B, C, R, D, E> ProcessNode<I, (B, C)> for Split<I, A, B, C, R, D, E>
where
    A: Clone + Send,
    B: Send,
    R: ProcessNode<I, A>,
    D: Send + ProcessNode<A, B>,
    E: ProcessNode<A, C>,
{
    #[inline]
    fn process(&mut self, input: I) -> (B, C) {
        let intermediate1 = self.root_node.process(input);
        let intermediate2 = intermediate1.clone();
        // Run both nodes in parallel
        crossbeam::scope(|scope| {
            let out1 = scope.spawn(|_| self.node1.process(intermediate1));
            let out2 = self.node2.process(intermediate2);
            (out1.join().unwrap(), out2)
        })
        .unwrap()
    }
}
2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.