Write this for loop as a map?

Notice the .await, which is confusing me.

Is there a nice way to rewrite this as a parts.map(|(k, w)| ...).collect::<Vec<_>>() ?

                let mut ans = vec![];
                for (k, w) in parts {
                    let t2 = std::future::ready(Err(Xos_Mailbox_Err::Timeout));
                    ans.push((k, w.or(t2).await));}

Here, parts: Vec<(K, Future)>

Are you looking for combination of stream::iter, StreamExt::map and StreamExt::collect? Can't check this myself right now, but sounds much like this.

1 Like

You can use StreamExt::flat_map or unfold

Flat Map

Playground

use std::time::Duration;

use futures::{stream::once, FutureExt, StreamExt};

#[tokio::main]
async fn main() {
    let parts = vec![
        (1usize, tokio::time::sleep(Duration::from_secs_f32(0.5))),
        (2, tokio::time::sleep(Duration::from_secs_f32(0.5))),
    ];

    let stream =
        futures::stream::iter(parts.into_iter()).flat_map(|(a, b)| once(b.map(move |b| (a, b))));

    let output = stream.collect::<Vec<(usize, ())>>().await;

    println!("{output:?}");
}

Unfold

Playground

use std::time::Duration;

use futures::{stream::unfold, StreamExt};

#[tokio::main]
async fn main() {
    let parts = vec![
        (1usize, tokio::time::sleep(Duration::from_secs_f32(0.5))),
        (2, tokio::time::sleep(Duration::from_secs_f32(0.5))),
    ];

    let stream = unfold(parts.into_iter(), |mut parts| async move {
        if let Some(next) = parts.next() {
            Some(((next.0, next.1.await), parts))
        } else {
            None
        }
    });

    let output = stream.collect::<Vec<(usize, ())>>().await;

    println!("{output:?}");
}

Frankly they're both a little verbose, though I personally find the unfold version easier to read

2 Likes

Well, this part doesn't need to be this complicated though.

let streams = futures::stream::iter(parts).then(|(a, b)| async { (a, b.await) });

For the OP's code

use futures::stream::{self, StreamExt};

let ans: Vec<_> = stream::iter(parts)
    .then(|(k, w)| await { (k, w.await.map_err(|_| XosMailboxErr::Timeout)) })
    .collect().await;

I assumed the .or() is some custom method which acts like TryFutureExt::or_else() but accepts value directly instead of taking closure produces value.

1 Like

My fault for not being clear. or refers to:

Waits until one of the two futures is ready. If both are ready, takes the first one.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.