Parallel work collected sequentially

OK, I've solved it by:

  • threadpool + mpsc
  • collecting results in BinaryHeap
    let mut buf = BinaryHeap::new();
    let mut wants = 0;
    for (i, mapped) in rx.iter().take(jobs) {
        buf.push(Pri(i, mapped)); // Pri implements reverse order on the tuple
        while buf.peek().map(|x|x.0) == Some(wants) {
            let Pri(_, mapped) = buf.pop().unwrap();
            wants += 1;
            reduce(mapped);
        }
    }
use std::collections::BinaryHeap;
use std::cmp::Ordering;
use threadpool::ThreadPool;
use std::sync::mpsc;

pub struct OrderedParallelQueue< T> {
    pool: ThreadPool,
    elements_queued: usize,
    sender: mpsc::SyncSender< ReverseTuple< T>>,
    receiver: mpsc::Receiver< ReverseTuple< T>>,
}

pub struct OrderedParallelQueueIter< 'a, T: 'a> {
    elements_queued: usize,
    next_element: usize,
    elements_done: BinaryHeap< ReverseTuple< T>>,
    receiver_iter: mpsc::Iter< 'a, ReverseTuple< T>>,
}

impl< T: Send + 'static> OrderedParallelQueue< T> {
    pub fn new(num_cpus: usize) -> Self {
        let (sender, receiver) = mpsc::sync_channel(num_cpus);
        Self {
            pool: ThreadPool::new(num_cpus),
            elements_queued: 0,
            sender,
            receiver,
        }
    }

    pub fn iter(&mut self) -> OrderedParallelQueueIter< T> {
        let res = OrderedParallelQueueIter {
            elements_queued: self.elements_queued,
            next_element: 0,
            elements_done: BinaryHeap::new(),
            receiver_iter: self.receiver.iter(),
        };
        self.elements_queued = 0; // Otherwise multiple iterators would call receiver iter too many times
        res
    }

    pub fn push< F>(&mut self, async_callback: F) where F: FnOnce() -> T + Send + 'static {
        let tx = self.sender.clone();
        let i = self.elements_queued;
        self.elements_queued += 1;
        self.pool.execute(move || {
            let res = async_callback();
            tx.send(ReverseTuple(i, res)).unwrap();
        });
    }
}

impl< 'a, T> Iterator for OrderedParallelQueueIter< 'a, T> {
    type Item = T;
    fn next(&mut self) -> Option< T> {
        if self.next_element >= self.elements_queued {
            return None;
        }

        while self.elements_done.peek().map(|i|i.0) != Some(self.next_element) {
            if let Some(item) = self.receiver_iter.next() {
                self.elements_done.push(item);
            } else {
                break; // probably won't happen
            }
        }

        self.next_element += 1;
        self.elements_done.pop().map(|t|t.1)
    }
}

struct ReverseTuple< T>(usize, T);
impl< T> PartialEq for ReverseTuple< T> {
    fn eq(&self, o: &Self) -> bool { o.0.eq(&self.0) }
}
impl< T> Eq for ReverseTuple< T> {}
impl< T> PartialOrd for ReverseTuple< T> {
    fn partial_cmp(&self, o: &Self) -> Option< Ordering> { o.0.partial_cmp(&self.0) }
}
impl< T> Ord for ReverseTuple< T> {
    fn cmp(&self, o: &Self) -> Ordering { o.0.cmp(&self.0) }
}
2 Likes