Write this for loop as a map?

Notice the .await, which is confusing me.

Is there a nice way to rewrite this as a parts.map(|(k, w)| ...).collect::<Vec<_>>() ?

                let mut ans = vec![];
                for (k, w) in parts {
                    let t2 = std::future::ready(Err(Xos_Mailbox_Err::Timeout));
                    ans.push((k, w.or(t2).await));}

Here, parts: Vec<(K, Future)>

Are you looking for combination of stream::iter, StreamExt::map and StreamExt::collect? Can't check this myself right now, but sounds much like this.

1 Like

You can use StreamExt::flat_map or unfold

Flat Map

Playground

use std::time::Duration;

use futures::{stream::once, FutureExt, StreamExt};

#[tokio::main]
async fn main() {
    let parts = vec![
        (1usize, tokio::time::sleep(Duration::from_secs_f32(0.5))),
        (2, tokio::time::sleep(Duration::from_secs_f32(0.5))),
    ];

    let stream =
        futures::stream::iter(parts.into_iter()).flat_map(|(a, b)| once(b.map(move |b| (a, b))));

    let output = stream.collect::<Vec<(usize, ())>>().await;

    println!("{output:?}");
}

Unfold

Playground

use std::time::Duration;

use futures::{stream::unfold, StreamExt};

#[tokio::main]
async fn main() {
    let parts = vec![
        (1usize, tokio::time::sleep(Duration::from_secs_f32(0.5))),
        (2, tokio::time::sleep(Duration::from_secs_f32(0.5))),
    ];

    let stream = unfold(parts.into_iter(), |mut parts| async move {
        if let Some(next) = parts.next() {
            Some(((next.0, next.1.await), parts))
        } else {
            None
        }
    });

    let output = stream.collect::<Vec<(usize, ())>>().await;

    println!("{output:?}");
}

Frankly they're both a little verbose, though I personally find the unfold version easier to read

2 Likes

Well, this part doesn't need to be this complicated though.

let streams = futures::stream::iter(parts).then(|(a, b)| async { (a, b.await) });

For the OP's code

use futures::stream::{self, StreamExt};

let ans: Vec<_> = stream::iter(parts)
    .then(|(k, w)| await { (k, w.await.map_err(|_| XosMailboxErr::Timeout)) })
    .collect().await;

I assumed the .or() is some custom method which acts like TryFutureExt::or_else() but accepts value directly instead of taking closure produces value.

1 Like

My fault for not being clear. or refers to:

Waits until one of the two futures is ready. If both are ready, takes the first one.