OK, I've solved it by:
- threadpool + mpsc
- collecting results in
BinaryHeap
let mut buf = BinaryHeap::new();
let mut wants = 0;
for (i, mapped) in rx.iter().take(jobs) {
buf.push(Pri(i, mapped)); // Pri implements reverse order on the tuple
while buf.peek().map(|x|x.0) == Some(wants) {
let Pri(_, mapped) = buf.pop().unwrap();
wants += 1;
reduce(mapped);
}
}
use std::collections::BinaryHeap; use std::cmp::Ordering; use threadpool::ThreadPool; use std::sync::mpsc; pub struct OrderedParallelQueue< T> { pool: ThreadPool, elements_queued: usize, sender: mpsc::SyncSender< ReverseTuple< T>>, receiver: mpsc::Receiver< ReverseTuple< T>>, } pub struct OrderedParallelQueueIter< 'a, T: 'a> { elements_queued: usize, next_element: usize, elements_done: BinaryHeap< ReverseTuple< T>>, receiver_iter: mpsc::Iter< 'a, ReverseTuple< T>>, } impl< T: Send + 'static> OrderedParallelQueue< T> { pub fn new(num_cpus: usize) -> Self { let (sender, receiver) = mpsc::sync_channel(num_cpus); Self { pool: ThreadPool::new(num_cpus), elements_queued: 0, sender, receiver, } } pub fn iter(&mut self) -> OrderedParallelQueueIter< T> { let res = OrderedParallelQueueIter { elements_queued: self.elements_queued, next_element: 0, elements_done: BinaryHeap::new(), receiver_iter: self.receiver.iter(), }; self.elements_queued = 0; // Otherwise multiple iterators would call receiver iter too many times res } pub fn push< F>(&mut self, async_callback: F) where F: FnOnce() -> T + Send + 'static { let tx = self.sender.clone(); let i = self.elements_queued; self.elements_queued += 1; self.pool.execute(move || { let res = async_callback(); tx.send(ReverseTuple(i, res)).unwrap(); }); } } impl< 'a, T> Iterator for OrderedParallelQueueIter< 'a, T> { type Item = T; fn next(&mut self) -> Option< T> { if self.next_element >= self.elements_queued { return None; } while self.elements_done.peek().map(|i|i.0) != Some(self.next_element) { if let Some(item) = self.receiver_iter.next() { self.elements_done.push(item); } else { break; // probably won't happen } } self.next_element += 1; self.elements_done.pop().map(|t|t.1) } } struct ReverseTuple< T>(usize, T); impl< T> PartialEq for ReverseTuple< T> { fn eq(&self, o: &Self) -> bool { o.0.eq(&self.0) } } impl< T> Eq for ReverseTuple< T> {} impl< T> PartialOrd for ReverseTuple< T> { fn partial_cmp(&self, o: &Self) -> Option< Ordering> { o.0.partial_cmp(&self.0) } } impl< T> Ord for ReverseTuple< T> { fn cmp(&self, o: &Self) -> Ordering { o.0.cmp(&self.0) } }