I'd like to implement a pattern I have used in other languages that is essentially an expensive reduction; an effective way to realize it is to have a copy of the structure to be reduced per thread, then each thread work-steals from a queue, updating its local structure.
Finally, after the queue of work is empty, we reduce the threads' structures into a final result.
I can almost do this with TLS, but I don't know how to do the final pass because there's no way that I know of to map 1 closure to 1 thread in rayon.
Another way is to have a heap array of the structures, index by thread id, but I don't understand the magic Send work I need to do handle it.
This is an example, albeit for a very simple reduction of u64s rather than some more complex struct.
use rayon::prelude::*;
use std::cell::RefCell;
thread_local! {
pub static SUM: RefCell<u64> = RefCell::new(0);
}
fn main() {
let N = 1u64 << 20;
// v0: closed-form
println!("{}", N * (N-1) / 2);
// v1: serial
let sum: u64 = (0..N).sum();
println!("{}", sum);
// v2: rayon reduce
let psum: u64 = (0..N).into_par_iter().reduce(|| 0, |x, y| x + y);
println!("{}", psum);
// v3: pool, with TLS
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(8)
.build()
.unwrap();
let range = 0..1 << 20;
pool.scope(|s| {
for x in range {
s.spawn(move |_| SUM.with(|m| *m.borrow_mut() += x));
}
});
// Except How do I access SUM once for each thread to do a reduction now?
// v4: Manual arrays
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(8)
.build()
.unwrap();
let mut temps = vec![0u64; pool.current_num_threads()];
let temps = temps.as_mut_ptr();
// how do I make this Send?
let range = 0..1 << 20;
pool.scope(|s| {
[{
for x in range {
s.spawn(|_| unsafe { *(temps.offset(rayon::current_thread_index().unwrap() as isize)) += x}) ;
}
}]
});
}
Errors:
Compiling playground v0.0.1 (/playground)
error[E0277]: `*mut u64` cannot be shared between threads safely
--> src/main.rs:57:19
|
57 | s.spawn(|_| unsafe { *(temps.offset(rayon::current_thread_index().unwrap() as isize)) += x}) ;
| ^^^^^ `*mut u64` cannot be shared between threads safely
|
= help: the trait `Sync` is not implemented for `*mut u64`
= note: required because of the requirements on the impl of `Send` for `&*mut u64`
= note: required because it appears within the type `[closure@src/main.rs:57:25: 57:108]`
note: required by a bound in `rayon::Scope::<'scope>::spawn`
--> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/rayon-core-1.9.3/src/scope/mod.rs:539:40
|
539 | BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
| ^^^^ required by this bound in `rayon::Scope::<'scope>::spawn`
For more information about this error, try `rustc --explain E0277`.
error: could not compile `playground` due to previous error