Idiomatic way of splitting iterator of Result into two iterators (Ok, Err)

Greetings.

I am sure this question has been asked before but I am having a problem finding a suitable answer today, my apologies.

I would like to know of idiomatic and high-performance Rust workflow where I have a single iterator producing a stream of many millions of Result<Ok, Err > values and I want to split that one iterator into 2 separate iterators - one to be iterator of only Ok() values and one with only Err() values.

Similarly one input iterator of Option values to be split into 2 iterators of only Some() and only None values, each with a tag of some sort to help identify the input.

I want to do this split in one pass, with high performance and minimum of memory.

Ideally workflow would be using standard Rust library or perhaps using itertools library, or another high quality Rust library.

Should I use input_iter.iter().split_by or split_by_key() as best solution for this?

If there is a web link to a previous solution, please link here.
Using Rust 1.77, edition 2021.

thank you.

You can't, really.

Suppose had a function you called and you got ok_iter and err_iter.

What has to happen when you call err_iter.next()? Well, it has to go through the iterator until it finds the first error. There might not be any, so it might have to iterate the whole thing to know to return None.

Then you call ok_iter.next(). Where does it get the success from? The err_iter.next() must have put them somewhere, which means that it needed to allocate space for them somewhere, which makes it non-lazy and O(n) space.

You might be interested in reading the various options listed in Iterating over Results - Rust By Example

10 Likes

The laziest version is probably something like this, but it still needs to cache O(N) items to work properly:

use std::collections::VecDeque;

pub struct ResultIter<T, E, I> {
    iter: I,
    pending: Result<VecDeque<T>, VecDeque<E>>
}

impl<T, E, I:Iterator<Item=Result<T,E>>> From<I> for ResultIter<T, E, I> {
    fn from(iter:I)->Self {
        ResultIter { iter, pending: Ok(VecDeque::new()) }
    }
}

impl<T, E, I:Iterator<Item=Result<T,E>>> ResultIter<T, E, I> {
    pub fn next_ok(&mut self)->Option<T> {
        if let Ok(v) = &mut self.pending {
            match v.pop_front() {
                Some(x) => { return Some(x); }
                None => { self.pending = Err(VecDeque::new()) }
            }
        }

        let &mut Err(ref mut v) = &mut self.pending else { unreachable!() };

        for x in &mut self.iter {
            match x {
                Ok(x) => return Some(x),
                Err(e) => { v.push_back(e); }
            }
        }

        None
    }

    pub fn next_err(&mut self)->Option<E> {
        if let Err(v) = &mut self.pending {
            match v.pop_front() {
                Some(x) => { return Some(x); }
                None => { self.pending = Ok(VecDeque::new()) }
            }
        }

        let &mut Ok(ref mut v) = &mut self.pending else { unreachable!() };

        for x in &mut self.iter {
            match x {
                Err(e) => return Some(e),
                Ok(x) => { v.push_back(x); }
            }
        }

        None
    }

    pub fn split(self)->(impl Iterator<Item=T>, impl Iterator<Item=E>) {
        use std::rc::Rc;
        use std::cell::RefCell;
        let cell1 = Rc::new(RefCell::new(self));
        let cell2 = cell1.clone();

        let ok_iter = std::iter::from_fn(move || cell1.borrow_mut().next_ok());
        let err_iter = std::iter::from_fn(move || cell2.borrow_mut().next_err());

        (ok_iter, err_iter)
    }
}
3 Likes

As others have explained, you'll potentially need to buffer many elements. If you're ok with that, you could use Itertools::partition_result which will collect the iterator eagerly into two different Vecs

This is it's example doc

let successes_and_failures = vec![Ok(1), Err(false), Err(true), Ok(2)];
let (successes, failures): (Vec<_>, Vec<_>) = successes_and_failures
    .into_iter()
    .partition_result();
assert_eq!(successes, [1, 2]);
assert_eq!(failures, [false, true]);
5 Likes

It would be possible to do this without any arbitrary length buffering, by using two threads that separately consume the two iterators, which would block until an element of the right type is available. Of course, that might not suit the application.

3 Likes

And if you're ok with pulling it all into memory, and this is really what you want to do, you could perhaps split it into two vecs

let mut oks = Vec::new();
let mut errs = Vec::new();
for result in results {
    match result {
        Ok(ok) => oks.push(ok),
        Err(err) => errs.push(err),
    }
}
for ok in oks {
    ...
}
for err in errs {
    ...
}

But yeah I mean, maybe you could clarify your goals some more? Because if your goal is to simultaneously 1. start from some arbitrary iterator of arbitrarily intermixed oks and errors 2. have a single loop that processes all of the oks first and then a second loop that processes all of the errors after than or vice versa and 3. pull each cache line from the RAM to the CPU only once, that's simply not a possible algorithm regardless of language, you'll have to relax at least one of those requirements.

1 Like

thank you everyone!

Due to the size of data (I/O, streaming, hundreds of millions of items flowing, continuously), I want to avoid storing everything in memory (except perhaps in buffer-like chunks?).

So I was hoping for a traditional "lazy" iterator solution which consumes on the fly and self throttles.

I think the closest solution would be the one mentioned in one reply above - provided with 2 threads, a single thread dedicated to Ok() and one for Err() (or one for Some() and one for None). Then probably using something like standard library mpsc channels with one producer and one consumer per channel for each thread.

Would such a design scale and perform optimally?

Is 2 dedicated threads design a better fit than using tokio asynch? (this is network and disk/SSD I/O).

thank you.

1 Like

Would such a design scale and perform optimally?

It's impossible to say, without details on what the data is and how it is to be processed in the two cases, and difficult to say even if we had those details.

For example, it might turn out to be wasteful to use multiple threads — perhaps the processing can be handled better by a single thread not trying to split the data into two iterators, but just branching on each item. Or not. It will definitely depend on the cost of whatever operation produces the Ok/Err cases, and the two operations for those two cases.

Such questions are best answered empirically, by building a prototype, then profiling and benchmarking. You do not need to, and essentially cannot, get it perfect by design up front.

5 Likes

You could also do this on a single thread using async but you'll have to switch from Iterator to Stream.

4 Likes

I suspect you're looking for partition_map from rayon

If the goal is high throughout performance with an Iterator, there's probably a rayon method for it: take a look through there to see if there's a more direct way to express what you're attempting.

2 Likes

I think you’re going to need to share specific details about your use case if you want any answers that are more specific.

2 Likes

thank you,

I am very happy with the answers so far, I was looking for general design guidance which I have now received.

Best regards and thanks to everyone who replied.

1 Like

It is acceptable for an Iterator to resume iteration after returning None. It would be valid for an implementation of let (ok_iter, err_iter) = result_iter.split_ok_err(); to return two iterators that yield None until the next value in the source iterator matches their expected Result variant. That implementation would only require enough memory to store a single Result<T, E>, by using .peek() on the source iterator for example.

Here's a possible implementation.

1 Like

That's just not UB, not what I'd call "acceptable". It'd mean that you can't really use any of the combinators in std, for example -- any of them might freeze internally, making them not work at all, and things like .collect() wouldn't do anything useful. So I'd hardly call it an iterator any more. (And you have problems like if you have only one iterator you have no way to know when you're done.)

So sure, you could do something like that, but at that point you might as well just have a different interface for clarity. Or just stop doing this and have one loop feeding the consumption because you clearly would need to drive the two parts in step anyway -- so you can call next on one after the other returns None -- at which point the split isn't doing anything useful.

3 Likes

I don't agree with this. I've definitely (purposefully) used iterators that go to a point and then restart and similar. Adapters (combinators) are still useful because &mut iterator also implements Iterator.

std::sync::mpsc::TryIter is a std iterator intentionally built around resuming.

If you need to rely on an iterator never resuming, that's what .fuse() and FusedIterator are for.

You're right, the next doc allows for this:

Returns None when iteration is finished. Individual iterator implementations may choose to resume iteration, and so calling next() again may or may not eventually start returning Some(Item) again at some point.

As I see it, in order for iterators which resume to be useful, you need some extra knowledge about when to try calling .next() again. For example, in the application discussed in this thread, you might have the extra knowledge “if both iterators return None, then there are no more items in the whole process and I should stop trying either one”. If you're reading from stdin, you might have the extra knowledge “every time I see a None, that's the user hitting ^D (EOF)”. But whether using a resuming iterator is a good idea, and how to do so, depends on the details of that extra knowledge.

5 Likes

That's true, but my issue here is that if one thing needs to drive both iterators anyway, that seems strictly worse than that one thing just driving the one iterator and matching on the Result it gets back.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.