Flatten an Iterator of Result<Vec<T>, Error> while collecting it

Hey, rustaceans! I'm wondering if it's possible to collect an iterator of Result<Vec, Error> and, while doing it, flatten the data structure. That is, do it in a single iteration.

My current solution involves two iterations, hence why I'm wondering if this can be done in a more efficient way.

Here's some dummy code for illustrative purposes and a playground:

use std::error::Error;
use futures::future::join_all;
  
#[tokio::main]
async fn main() {
    async fn duplicate(number: i32) -> Result<Vec<i32>, Box<dyn Error>> {
        Ok(vec!(number * 2))
    }

    let my_numbers = vec!(1, 2, 3, 4, 5);

    let future_duplicated_evens = my_numbers.into_iter().filter_map(|number| {
            if number % 2 == 0 {
            Some(duplicate(number))
        } else {
            None
        }
    }).collect::<Vec<_>>();

    let flattened = join_all(future_duplicated_evens).await.into_iter().collect::<Result<Vec<_>, Box<dyn Error>>>().unwrap().into_iter().flatten().collect::<Vec<i32>>();
    
    println!("Flattened: {:?}", flattened);
} 

Playground.

You can do it with itertools::process_results. The same kind of function (i.e. an equivalent std-internal function) is used in the implementation of FromIterator that allows .collecting into a Result<Vec<…>, _>.

use futures::future::join_all;
use itertools::{process_results, Itertools};
use std::error::Error;

#[tokio::main]
async fn main() {
    async fn duplicate(number: i32) -> Result<Vec<i32>, Box<dyn Error>> {
        Ok(vec![number * 2])
    }

    let my_numbers = vec![1, 2, 3, 4, 5];

    let future_duplicated_evens = my_numbers
        .into_iter()
        .filter_map(|number| {
            if number % 2 == 0 {
                Some(duplicate(number))
            } else {
                None
            }
        })
        .collect_vec();

    let flattened = process_results(join_all(future_duplicated_evens).await, |i| {
        i.flatten().collect_vec()
    })
    .unwrap();

    println!("Flattened: {:?}", flattened);
}

(in the playground)

(While I’m at it, I’ve introduced usage of Itertools::collect_vec to write .collect_vec() instead of .collect::<Vec<_>>.)

You can use try_fold(..) to do it manually in one iteration.

I'm not sure if it would be more or less efficient to make a separate iteration counting the items first, to do fewer allocations, it probably depends on how the input looks.

1 Like

Another way with process_results, but using Itertools::concat, (which does a fold with extend internally, so some similarity to @kaj’s approach):

let flattened =
    process_results(join_all(future_duplicated_evens).await, |i| i.concat()).unwrap();

playground


Edit: Actually, concat uses fold1; it’s extending the first Vec item, so we’re even saving one allocation compared to the explicit try_fold approach. (Vec::new is only used in case the iterator is completely empty.)

1 Like

Awesome! Thanks guys :smiley: !

Your example should probably use try_join_all because that has the additional benefit of cancelling the other futures in case one returns an Err.

let flattened = concat(try_join_all(future_duplicated_evens).await.unwrap());

(playground)

(Note that this also changes the behavior in that you get the error of the future that finishes first, not the error that comes first in the list.)

Obviously, I’m assuming that this would be used with a less trivial async fn, in particular one that can actually return Err sometimes.

2 Likes

Great! Didn't know there was this short-circuiting alternative :D.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.