My code base is relatively complicated, so let me know if I forgot to add some details here.
I've got a Dynamics
trait which defines the kind of structures which can be handled by a propagator. A propagator solves the initial value problem (there are different terms for this, I go with "propagator" which is the common term in my industry). These dynamics have been shown to work on another thread without problem (e.g. here).
At a high level, I'm trying to implement Parareal which can solve the same problem in parallel. This requires cloning the Dynamics
structure, and splitting up the computation onto several thread.
The compiler complains about DefaultAllocator::Buffer
not being thread-safe. I understand why that would not be the case, but I don't understand why it matters to my problem. I don't need the buffer to be sent around, just the dynamics, and they can have their own allocation.
In the following code, I'm simply trying to have the lofi_dyn
dynamics be executed on its own thread, and collect all of the intermediate steps in a channel on the main thread. From my point of view, this is the same as the example shown above where Dynamics are executed on their own thread... the compiler obviously doesn't agree.
What architecture and syntax should I be using to implement the Parareal algorithm?
pub struct Parareal<M: Dynamics, E: ErrorCtrl> {
pub dynamics: M,
pub opts: PropOpts<E>,
pub max_cpus: usize,
}
impl<M: Dynamics, E: ErrorCtrl> Parareal<M, E> {
pub fn until_time_elapsed(&mut self, elapsed_time: f64) -> (f64, VectorN<f64, M::StateSize>)
where
DefaultAllocator: Allocator<f64, M::StateSize>,
M: Send + Sync,
{
// Split up the integration on as many CPUs as possible
let lofi_step = elapsed_time / self.max_cpus as f64;
println!("{}", lofi_step);
let (tx, rx): (
Sender<VectorN<f64, M::StateSize>>,
Receiver<VectorN<f64, M::StateSize>>,
) = channel();
let lfopts = PropOpts::with_fixed_step(lofi_step);
thread::spawn(move || {
let mut lofi_dyn = self.dynamics.clone();
let mut lofi = Propagator::new::<RK2Fixed>(&mut lofi_dyn, &lfopts);
lofi.raw_tx_chan = Some(&tx);
lofi.until_time_elapsed(elapsed_time);
});
while let Ok(rx_state) = rx.recv() {
println!("{:?}", rx_state);
}
(0.0, VectorN::zeros())
}
}
Here is the error:
error[E0277]: `<propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer` cannot be sent between threads safely
--> src/propagators/parareal.rs:31:9
|
31 | thread::spawn(move || {
| ^^^^^^^^^^^^^ `<propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer` cannot be sent between threads safely
|
= help: within `propagators::na::Matrix<f64, <M as dynamics::Dynamics>::StateSize, propagators::na::U1, <propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer>`, the trait `std::marker::Send` is not implemented for `<propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer`
= note: required because it appears within the type `propagators::na::Matrix<f64, <M as dynamics::Dynamics>::StateSize, propagators::na::U1, <propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer>`
= note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::mpsc::Sender<propagators::na::Matrix<f64, <M as dynamics::Dynamics>::StateSize, propagators::na::U1, <propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer>>`
= note: required because it appears within the type `[closure@src/propagators/parareal.rs:31:23: 36:10 self:&mut propagators::parareal::Parareal<M, E>, lfopts:propagators::PropOpts<propagators::error_ctrl::RSSStepPV>, tx:std::sync::mpsc::Sender<propagators::na::Matrix<f64, <M as dynamics::Dynamics>::StateSize, propagators::na::U1, <propagators::na::DefaultAllocator as od::hyperdual::Allocator<f64, <M as dynamics::Dynamics>::StateSize>>::Buffer>>, elapsed_time:f64]`
error: aborting due to previous error