Why can't I move thread behavior into a function?

My code base is relatively complicated, so let me know if I forgot to add some details here.

I've got a Dynamics trait which defines the kind of structures which can be handled by a propagator. A propagator solves the initial value problem (there are different terms for this, I go with "propagator" which is the common term in my industry). These dynamics have been shown to work on another thread without problem (e.g. here).

At a high level, I'm trying to implement Parareal which can solve the same problem in parallel. This requires cloning the Dynamics structure, and splitting up the computation onto several thread.

The compiler complains about DefaultAllocator::Buffer not being thread-safe. I understand why that would not be the case, but I don't understand why it matters to my problem. I don't need the buffer to be sent around, just the dynamics, and they can have their own allocation.

In the following code, I'm simply trying to have the lofi_dyn dynamics be executed on its own thread, and collect all of the intermediate steps in a channel on the main thread. From my point of view, this is the same as the example shown above where Dynamics are executed on their own thread... the compiler obviously doesn't agree.

What architecture and syntax should I be using to implement the Parareal algorithm?

pub struct Parareal<M: Dynamics, E: ErrorCtrl> {
    pub dynamics: M,
    pub opts: PropOpts<E>,
    pub max_cpus: usize,
}

impl<M: Dynamics, E: ErrorCtrl> Parareal<M, E> {
    pub fn until_time_elapsed(&mut self, elapsed_time: f64) -> (f64, VectorN<f64, M::StateSize>)
    where
        DefaultAllocator: Allocator<f64, M::StateSize>,
        M: Send + Sync,
    {
        // Split up the integration on as many CPUs as possible
        let lofi_step = elapsed_time / self.max_cpus as f64;

        println!("{}", lofi_step);

        let (tx, rx): (
            Sender<VectorN<f64, M::StateSize>>,
            Receiver<VectorN<f64, M::StateSize>>,
        ) = channel();

        let lfopts = PropOpts::with_fixed_step(lofi_step);
        thread::spawn(move || {
            let mut lofi_dyn = self.dynamics.clone();
            let mut lofi = Propagator::new::<RK2Fixed>(&mut lofi_dyn, &lfopts);
            lofi.raw_tx_chan = Some(&tx);
            lofi.until_time_elapsed(elapsed_time);
        });

        while let Ok(rx_state) = rx.recv() {
            println!("{:?}", rx_state);
        }

        (0.0, VectorN::zeros())
    }
}

Here is the error:

error[E0277]: `<propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer` cannot be sent between threads safely
   --> src/propagators/parareal.rs:31:9
    |
31  |         thread::spawn(move || {
    |         ^^^^^^^^^^^^^ `<propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer` cannot be sent between threads safely
    |
    = help: within `propagators::na::Matrix<f64, <M as dynamics::Dynamics>::StateSize, propagators::na::U1, <propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer>`, the trait `std::marker::Send` is not implemented for `<propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer`
    = note: required because it appears within the type `propagators::na::Matrix<f64, <M as dynamics::Dynamics>::StateSize, propagators::na::U1, <propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::mpsc::Sender<propagators::na::Matrix<f64, <M as dynamics::Dynamics>::StateSize, propagators::na::U1, <propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer>>`
    = note: required because it appears within the type `[closure@src/propagators/parareal.rs:31:23: 36:10 self:&mut propagators::parareal::Parareal<M, E>, lfopts:propagators::PropOpts<propagators::error_ctrl::RSSStepPV>, tx:std::sync::mpsc::Sender<propagators::na::Matrix<f64, <M as dynamics::Dynamics>::StateSize, propagators::na::U1, <propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer>>, elapsed_time:f64]`

error: aborting due to previous error

I'm having some trouble understanding this code (a reduced version of it might get better help), but I suspect the problem you're having is that you're not sending the dynamics to a separate thread, you're sending self.

thread::spawn(move || {
            let mut lofi_dyn = self.dynamics.clone();
...

The closure is told to move self. You probably want to clone the dynamics outside the closure for the other thread:

let mut lofi_dyn = self.dynamics.clone();
thread::spawn(move || { ...

If that doesn't fix it -- what's a DefaultAllocator? Is its Buffer included in one of these types? I don't actually see it being used anywhere.

I ended up writing a demo function which does what I want, in terms of logic (the math is broken, but that's another problem).

DefaultAllocator is part of nalgebra (a linear algebra toolkit I'm using): https://www.nalgebra.org/rustdoc/nalgebra/base/default_allocator/struct.DefaultAllocator.html .

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.