Per-thread storage patterns & custom reductions

I'd like to implement a pattern I have used in other languages that is essentially an expensive reduction; an effective way to realize it is to have a copy of the structure to be reduced per thread, then each thread work-steals from a queue, updating its local structure.

Finally, after the queue of work is empty, we reduce the threads' structures into a final result.

I can almost do this with TLS, but I don't know how to do the final pass because there's no way that I know of to map 1 closure to 1 thread in rayon.

Another way is to have a heap array of the structures, index by thread id, but I don't understand the magic Send work I need to do handle it.

This is an example, albeit for a very simple reduction of u64s rather than some more complex struct.

use rayon::prelude::*;
use std::cell::RefCell;

thread_local! {
    pub static SUM: RefCell<u64> = RefCell::new(0);
}

fn main() {

    let N = 1u64 << 20;

    // v0: closed-form
    println!("{}", N * (N-1) / 2);

    // v1: serial
    let sum: u64 = (0..N).sum();

    println!("{}", sum);

    // v2: rayon reduce
    let psum: u64 = (0..N).into_par_iter().reduce(|| 0, |x, y| x + y);

    println!("{}", psum);

    // v3: pool, with TLS
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(8)
        .build()
        .unwrap();

    let range = 0..1 << 20;
    pool.scope(|s| {
        for x in range {
            s.spawn(move |_| SUM.with(|m| *m.borrow_mut() += x));
        }
    });

    // Except How do I access SUM once for each thread to do a reduction now?


    // v4: Manual arrays
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(8)
        .build()
        .unwrap();

    let mut temps = vec![0u64; pool.current_num_threads()];

    let temps = temps.as_mut_ptr();

    // how do I make this Send?

    let range = 0..1 << 20;
    pool.scope(|s| {
        [{
            for x in range {
                s.spawn(|_| unsafe { *(temps.offset(rayon::current_thread_index().unwrap() as isize)) += x}) ;
            }
        }]
    });
}

(Playground)

Errors:

   Compiling playground v0.0.1 (/playground)
error[E0277]: `*mut u64` cannot be shared between threads safely
   --> src/main.rs:57:19
    |
57  |                 s.spawn(|_| unsafe { *(temps.offset(rayon::current_thread_index().unwrap() as isize)) += x}) ;
    |                   ^^^^^ `*mut u64` cannot be shared between threads safely
    |
    = help: the trait `Sync` is not implemented for `*mut u64`
    = note: required because of the requirements on the impl of `Send` for `&*mut u64`
    = note: required because it appears within the type `[closure@src/main.rs:57:25: 57:108]`
note: required by a bound in `rayon::Scope::<'scope>::spawn`
   --> /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/rayon-core-1.9.3/src/scope/mod.rs:539:40
    |
539 |         BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
    |                                        ^^^^ required by this bound in `rayon::Scope::<'scope>::spawn`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `playground` due to previous error

Well, technically, you could just wrap the raw pointer in a struct for which you unsafely implement Send, but it's very likely that using raw pointers and unsafe is not the right solution. For something as structured as this problem, where there is no need to interact with FFI or manage your own memory, you should absolutely stick to safe code.

In this specific case, you could just use atomics (Playground). (Here is also a smaller test case that actually manages to complete within the time limit of the Playground.)

Thanks!

I'm aware of atomics, and they aren't suitable for performance reasons (with one integer per task, this code is terrible anyway, but I'm very aware of the costs of atomics on CPUs).

They also won't work if we need to do more complicated things like update a compound data structure. Sure, we could use mutexes or some other locking mechanism, but contention would still be a problem.

I'm fairly certain the most efficient way to do this sort of thing on a contemporary CPU is to privatize the result per thread, then have each thread do their own accumulation on their local copy, and finally to do some sort of tree reduction across the thread-local copies.

There's really no reason the storage couldn't live on each thread's stack rather than statically or on the heap, for that matter.

Here's how I figure out how to do this 'manually':

use core::ops::Range;
use crossbeam_channel::unbounded;
use divide_range::RangeDivisions;
use num_cpus;
use rayon::prelude::*;
use std::sync::mpsc::channel;
use std::thread;
use std::time::Instant;

#[derive(Debug)]
struct FancyStats {
    count: u64,
    sum: u64,
    heap: Vec<u64>,
}

impl FancyStats {
    fn new() -> FancyStats {
        FancyStats {
            count: 0u64,
            sum: 0u64,
            heap: vec![0; 256],
        }
    }

    fn new_from(v: u64) -> FancyStats {
        let mut res = FancyStats {
            count: 1u64,
            sum: v,
            heap: vec![0; 256],
        };
        res.heap[v as usize] = 1;
        res
    }

    fn combine_from(&mut self, v: u64) {
        self.count += 1;
        self.sum += v;
        self.heap[v as usize] += 1;
    }

    fn add(&self, other: &FancyStats) -> FancyStats {
        let mut res = FancyStats {
            count: self.count + other.count,
            sum: self.sum + other.sum,
            heap: vec![0; 256],
        };
        for i in 0..res.heap.len() {
            res.heap[i] = self.heap[i] + other.heap[i];
        }
        res
    }
}

fn main() {
    let limit = 1 << 30;

    // Rayon... very compact, but cannot help but make O(limit) FancyStats
    let start = Instant::now();
    let res = (0..limit)
        .into_par_iter()
        .map(|x| FancyStats::new_from((x % 256) as u64))
        .reduce(FancyStats::new, |x, y| FancyStats::add(&x, &y));
    let duration = start.elapsed();
    println!("{:?}, in {:?}", res, duration);

    // My approach... wordy and probably inefficient in soem details, but doesn't make more FancyStats than it needs to
    let start = Instant::now();
    let (finish_s, finish_r) = channel();
    let (s, r) = unbounded();

    let nthreads = num_cpus::get_physical();

    (0..nthreads).for_each(|_| {
        let r = r.clone();
        let finish_s = finish_s.clone();
        thread::spawn(move || {
            let mut my_stats = FancyStats::new();
            r.iter().for_each(|iter: Range<u32>| {
                iter.for_each(|x| my_stats.combine_from((x % 256) as u64))
            });
            finish_s.send(my_stats).unwrap();
        });
    });

    (0..limit)
        .divide_evenly_into(1 << 10)
        .for_each(|i| s.try_send(i).expect("noo"));
    drop(s);

    let res = finish_r.iter().take(nthreads).reduce(|a, e| a.add(&e));
    let duration = start.elapsed();
    println!("{:?}, in {:?}", res, duration);
}

Output (on 32-core Threadripper):

 target/release/parsteal 
FancyStats { count: 1073741824, sum: 136902082560, heap: [<snip> , 4194304] }, in 6.532095065s
Some(FancyStats { count: 1073741824, sum: 136902082560, heap: [<snip>, 4194304] }), in 39.249481ms

I'd be curious on others' thoughts on this pattern and how I could do better/how it could fit into rayon (if at all)

There is one answer to “how to collect thread local data from each thread” in this old WIP pull request: Add ThreadPool::broadcast by cuviper · Pull Request #492 · rayon-rs/rayon · GitHub ; to have it work repeatably you'd first broadcast to set a starting value and broadcast in the end to fetch.

I would start by using fold or reduce, since that's an approximation to what you want to do! However, of course, rayon may split aggressively and create more jobs to resolve than there are threads. Using .with_min_len() could mitigate that.

In your particular example, just using regular exclusive mutable references could also work? One &mut per thread job (but then you have to split manually).

    let mut temps = vec![0u64; pool.current_num_threads()];

    pool.scope(|s| {
        for (index, temp) in (&mut temps).into_iter().enumerate() {
            s.spawn(move |_| *temp += index as u64);
        }
    });
3 Likes

Hmm, I should try to revisit that broadcast and actually ship it...