How to "join_all_fold" an iterator of streams?

Hi,

I have a vector of streams that I would like to accumulate / fold without caring about the order over each stream outputs results. E.g. something like "join_all"+"fold". However, I am not finding the right idiom for this.

use futures::stream::{self, StreamExt};

#[tokio::main]
async fn main() {
    // some streams
    let streams = (0..10)
        .map(|i: i32| {
            stream::iter(i+4..i+5)
        }).collect::<Vec<_>>();

    // what I can't write correctly:
    let result = futures::stream::iter(streams)
        .flatten()
        .fold(Vec::<i32>::new(), |mut acc, batch| async move {
            // for now we place it in vector, but I want to accumulate them (e.g. `acc+=1`) so that not all data is stored.
            acc.push(batch);
            acc
        })
        .await;

    for i in result {
        println!("{:?}", i);
    }
}

playground

this program is obviously waiting for each stream to finish before starting the next one due to the "stream::iter".

Basically, I would like to wait for the streams on any order, like the join_all, but fold the result in whatever order it appears.

Any ideas?

The solution is to use select_all, which returns a single stream. We can then fold over that stream, which picks elements as they become available.

1 Like