Parallel work collected sequentially


#1

I have a task that is sort-of like map-reduce, but the reduce part has to be done exactly sequentially (third element depends on the second, second depends on the first, etc.).

I’d like to perform the map part in parallel, and collect mapped results and reduce them as soon as they’re available in order (so if the first element is ready, process it immediately. If third element is ready, buffer it and wait until the second element is ready).

And a gotcha: all elements in the map phase don’t fit in memory at once, so both the mapping and reducing phase has to run at the same time.

I’ve tried rayon, but it’s parallel iterator isn’t a regular iterator, so this (theoretically ideal) code didn’t work:

for mapped in input.par_iter().map(m) {
  reduce(mapped);
}

This works in theory, but I run out of memory in practice:

let all_mapped = input.par_iter().map(m).collect(); // array too large
for mapped in all_mapped {
  reduce(mapped);
}

Can this be done with rayon? Is there another library that would fit?


#2

The general request can be found in rayon#210, but we don’t have a great solution yet.

The workaround that I often suggest is to batch your work in manageable chunks. i.e. do the parallel map(m).collect() on just the first part of your input, then serially reduce that, then do another parallel section, reduce, etc. etc.

let mut result = init();
for chunk in input.chunks(N) {
    let buffer = chunk.par_iter().map(m).collect();
    result = reduce(result, buffer);
}

You might even do the reduction and next part in parallel so you don’t have serialization stalls, like:

// do the first chunk on its own
let mut buffer = input[..N].par_iter().map(m).collect();

let mut result = init();
for chunk in input[N..].chunks(N) {
    // reduce the prior chunk in order, while we also process the next chunk
    let (r, b) = rayon::join(|| reduce(result, buffer),
                             chunk.par_iter().map(m).collect());
    result = r;
    buffer = b;
}

// reduce the final chunk
reduce(result, buffer)

I wonder if the library could encapsulate some patterns like this, but it may be too domain specific… not sure.


#3

OK, I’ve solved it by:

  • threadpool + mpsc
  • collecting results in BinaryHeap
    let mut buf = BinaryHeap::new();
    let mut wants = 0;
    for (i, mapped) in rx.iter().take(jobs) {
        buf.push(Pri(i, mapped)); // Pri implements reverse order on the tuple
        while buf.peek().map(|x|x.0) == Some(wants) {
            let Pri(_, mapped) = buf.pop().unwrap();
            wants += 1;
            reduce(mapped);
        }
    }
use std::collections::BinaryHeap;
use std::cmp::Ordering;
use threadpool::ThreadPool;
use std::sync::mpsc;

pub struct OrderedParallelQueue< T> {
    pool: ThreadPool,
    elements_queued: usize,
    sender: mpsc::SyncSender< ReverseTuple< T>>,
    receiver: mpsc::Receiver< ReverseTuple< T>>,
}

pub struct OrderedParallelQueueIter< 'a, T: 'a> {
    elements_queued: usize,
    next_element: usize,
    elements_done: BinaryHeap< ReverseTuple< T>>,
    receiver_iter: mpsc::Iter< 'a, ReverseTuple< T>>,
}

impl< T: Send + 'static> OrderedParallelQueue< T> {
    pub fn new(num_cpus: usize) -> Self {
        let (sender, receiver) = mpsc::sync_channel(num_cpus);
        Self {
            pool: ThreadPool::new(num_cpus),
            elements_queued: 0,
            sender,
            receiver,
        }
    }

    pub fn iter(&mut self) -> OrderedParallelQueueIter< T> {
        let res = OrderedParallelQueueIter {
            elements_queued: self.elements_queued,
            next_element: 0,
            elements_done: BinaryHeap::new(),
            receiver_iter: self.receiver.iter(),
        };
        self.elements_queued = 0; // Otherwise multiple iterators would call receiver iter too many times
        res
    }

    pub fn push< F>(&mut self, async_callback: F) where F: FnOnce() -> T + Send + 'static {
        let tx = self.sender.clone();
        let i = self.elements_queued;
        self.elements_queued += 1;
        self.pool.execute(move || {
            let res = async_callback();
            tx.send(ReverseTuple(i, res)).unwrap();
        });
    }
}

impl< 'a, T> Iterator for OrderedParallelQueueIter< 'a, T> {
    type Item = T;
    fn next(&mut self) -> Option< T> {
        if self.next_element >= self.elements_queued {
            return None;
        }

        while self.elements_done.peek().map(|i|i.0) != Some(self.next_element) {
            if let Some(item) = self.receiver_iter.next() {
                self.elements_done.push(item);
            } else {
                break; // probably won't happen
            }
        }

        self.next_element += 1;
        self.elements_done.pop().map(|t|t.1)
    }
}

struct ReverseTuple< T>(usize, T);
impl< T> PartialEq for ReverseTuple< T> {
    fn eq(&self, o: &Self) -> bool { o.0.eq(&self.0) }
}
impl< T> Eq for ReverseTuple< T> {}
impl< T> PartialOrd for ReverseTuple< T> {
    fn partial_cmp(&self, o: &Self) -> Option< Ordering> { o.0.partial_cmp(&self.0) }
}
impl< T> Ord for ReverseTuple< T> {
    fn cmp(&self, o: &Self) -> Ordering { o.0.cmp(&self.0) }
}