Itertools flatten_ok for ParallelIterator? transposing Vec/Result

I'm trying to use Rayon to do some parallelizable work that converts History objects to Commit objects, but there is more than 1 Commit produced per History, so I'm actually producing Vec<Commit>, but also the work is fallible so I'm really producing Result<Vec<Commit>, E>. The code that is going to use this iterator wants to see an iterator that yields Result<Commit, E>, not Result<Vec<Commit>, E>, so I need a flattening operation, but:

  • I want to stop all processing once the first error is encountered, but I want to preserve the error. Flattening Result<T, E> discards errors.
  • I think itertools flatten_ok does what I need, but it only works for Iterator, not ParallelIterator. Rayon has ParallelBridge which lets you convert Iterator -> ParallelIterator, but not vice versa, so I don't think there's any way for me to use flatten_ok as is.
  • I want to avoid ever collecting the entire dataset into memory (the number of Commit produced for each History is small, so keeping some Vec<Commit> in memory is fine, but the overall stream of them can't fit).
  • I could use fold to build up a big vector of the Ok contents from the smaller vectors and do error detection, but this is basically collect.

Do I need to define my own ParallelIterator to do this? Or is there some easy combination I'm missing?

Sketch of what I have:

let filtered_history: Result<Vec<Commit>, Error> = HistoryIterator::new(commit.clone())
    .enumerate()
    .par_bridge()
    .map(|x| {
        if early_exit.load(Ordering::Relaxed) {
            info!("Ctrl-c detected, ending loading.");
            return None;
        }
        Some(x)
    })
    .while_some()
    .map(|(count, commit)| -> Result<Vec<Commit>, Error> {
        // ... implementation details here
    })
    // TODO: something here to flatten incoming stream of
    // Result<Vec<Commit>, E> into a stream of Result<Commit, E>
;

I made a ticket about this in case anyone else has this question later

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.