I don't want extra allocation.
I don't need to collect it.
But I want it to return the error immediately if any error occurs.
use futures::stream::{self, StreamExt, TryStreamExt};
// This is what I do now
// Is there a way to avoid the `collect`
stream::iter().buffer_unordered(x).try_collect().await?;
let mut okay = false;
let st = stream.iter().buffer_unordered(x);
while let Ok(p) = st.try_next().await {
if p.is_none() {
okay = true;
break;
}
}
if !okay {
// Which means loop exited early, so some future returned an Err
}
Yes, I have checked that before. But it looks like not fit my requirement.
// Official example
use futures::future;
use futures::stream::{self, TryStreamExt};
let mut x = 0i32;
{
// ↓↓↓↓↓ These futures might return an error.
let fut = stream::repeat(Ok(1)).try_for_each(|item| {
// I don't know why the `item` is an 'item'.
// If we replace the `Ok(1)` with `Err(1)`, how can I emit that error.
x += item;
future::ready(if x == 3 { Err(()) } else { Ok(()) })
});
assert_eq!(fut.await, Err(()));
}
assert_eq!(x, 3);
FromIterator is only relevant for .collect() on an Iterator. The .collect() or .try_collect() methods on futures::stream::[Try]StreamExt both work with Default + Extend<Item> bounds instead.
Now, () does actually implement those too, but in either case (FromIterator or Default + Extend<Item>), it only supports item-type () though, so you'd have to get rid the items first, e.g. via a .map(drop) or .map_ok(drop) respectively, to handle Stream/TryStream of a non-() item type.