I'm trying to use Rayon to do some parallelizable work that converts History
objects to Commit
objects, but there is more than 1 Commit
produced per History
, so I'm actually producing Vec<Commit>
, but also the work is fallible so I'm really producing Result<Vec<Commit>, E>
. The code that is going to use this iterator wants to see an iterator that yields Result<Commit, E>
, not Result<Vec<Commit>, E>
, so I need a flattening operation, but:
- I want to stop all processing once the first error is encountered, but I want to preserve the error. Flattening
Result<T, E>
discards errors. - I think itertools
flatten_ok
does what I need, but it only works forIterator
, notParallelIterator
. Rayon hasParallelBridge
which lets you convertIterator
->ParallelIterator
, but not vice versa, so I don't think there's any way for me to useflatten_ok
as is. - I want to avoid ever
collect
ing the entire dataset into memory (the number ofCommit
produced for eachHistory
is small, so keeping someVec<Commit>
in memory is fine, but the overall stream of them can't fit). - I could use
fold
to build up a big vector of theOk
contents from the smaller vectors and do error detection, but this is basicallycollect
.
Do I need to define my own ParallelIterator to do this? Or is there some easy combination I'm missing?
Sketch of what I have:
let filtered_history: Result<Vec<Commit>, Error> = HistoryIterator::new(commit.clone())
.enumerate()
.par_bridge()
.map(|x| {
if early_exit.load(Ordering::Relaxed) {
info!("Ctrl-c detected, ending loading.");
return None;
}
Some(x)
})
.while_some()
.map(|(count, commit)| -> Result<Vec<Commit>, Error> {
// ... implementation details here
})
// TODO: something here to flatten incoming stream of
// Result<Vec<Commit>, E> into a stream of Result<Commit, E>
;