Notice the .await
, which is confusing me.
Is there a nice way to rewrite this as a parts.map(|(k, w)| ...).collect::<Vec<_>>()
?
let mut ans = vec![];
for (k, w) in parts {
let t2 = std::future::ready(Err(Xos_Mailbox_Err::Timeout));
ans.push((k, w.or(t2).await));}
Here, parts: Vec<(K, Future)>
Are you looking for combination of stream::iter
, StreamExt::map
and StreamExt::collect
? Can't check this myself right now, but sounds much like this.
1 Like
You can use StreamExt::flat_map
or unfold
Flat Map
Playground
use std::time::Duration;
use futures::{stream::once, FutureExt, StreamExt};
#[tokio::main]
async fn main() {
let parts = vec![
(1usize, tokio::time::sleep(Duration::from_secs_f32(0.5))),
(2, tokio::time::sleep(Duration::from_secs_f32(0.5))),
];
let stream =
futures::stream::iter(parts.into_iter()).flat_map(|(a, b)| once(b.map(move |b| (a, b))));
let output = stream.collect::<Vec<(usize, ())>>().await;
println!("{output:?}");
}
Unfold
Playground
use std::time::Duration;
use futures::{stream::unfold, StreamExt};
#[tokio::main]
async fn main() {
let parts = vec![
(1usize, tokio::time::sleep(Duration::from_secs_f32(0.5))),
(2, tokio::time::sleep(Duration::from_secs_f32(0.5))),
];
let stream = unfold(parts.into_iter(), |mut parts| async move {
if let Some(next) = parts.next() {
Some(((next.0, next.1.await), parts))
} else {
None
}
});
let output = stream.collect::<Vec<(usize, ())>>().await;
println!("{output:?}");
}
Frankly they're both a little verbose, though I personally find the unfold version easier to read
2 Likes
Hyeonu
4
Well, this part doesn't need to be this complicated though.
let streams = futures::stream::iter(parts).then(|(a, b)| async { (a, b.await) });
For the OP's code
use futures::stream::{self, StreamExt};
let ans: Vec<_> = stream::iter(parts)
.then(|(k, w)| await { (k, w.await.map_err(|_| XosMailboxErr::Timeout)) })
.collect().await;
I assumed the .or()
is some custom method which acts like TryFutureExt::or_else()
but accepts value directly instead of taking closure produces value.
1 Like
My fault for not being clear. or
refers to:
Waits until one of the two futures is ready. If both are ready, takes the first one.
system
Closed
6
This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.