Is there a way to execute the `Stream` without collect it

I don't want extra allocation.
I don't need to collect it.
But I want it to return the error immediately if any error occurs.

use futures::stream::{self, StreamExt, TryStreamExt};

// This is what I do now
// Is there a way to avoid the `collect`
stream::iter().buffer_unordered(x).try_collect().await?;

What Stream from which library?

Updated.

Well, you could:

let mut okay = false;
let st = stream.iter().buffer_unordered(x);
while let Ok(p) = st.try_next().await {
    if p.is_none() {
        okay = true;
        break;
    }
}
if !okay {
    // Which means loop exited early, so some future returned an Err
}

It looks like you create multiple streams if the first-next is some.
But I get what you mean.

Fixed it.

Looks like there's a TryStreamExt::try_for_each

1 Like

Yes, I have checked that before. But it looks like not fit my requirement.

// Official example
use futures::future;
use futures::stream::{self, TryStreamExt};

let mut x = 0i32;

{
//                           ↓↓↓↓↓ These futures might return an error.
    let fut = stream::repeat(Ok(1)).try_for_each(|item| {
// I don't know why the `item` is an 'item'.
// If we replace the `Ok(1)` with `Err(1)`, how can I emit that error.
        x += item;
        future::ready(if x == 3 { Err(()) } else { Ok(()) })
    });
    assert_eq!(fut.await, Err(()));
}

assert_eq!(x, 3);
use futures::stream::{self, StreamExt};

let mut tasks = stream::iter(x).buffer_unordered(x);
while let Some(r) = tasks.next().await {
    r?;
}

Ok(())

another option:. () implements FromIterator, so you can also use that to .collect() without allocating

1 Like

FromIterator is only relevant for .collect() on an Iterator. The .collect() or .try_collect() methods on futures::stream::[Try]StreamExt both work with Default + Extend<Item> bounds instead.

Now, () does actually implement those too, but in either case (FromIterator or Default + Extend<Item>), it only supports item-type () though, so you'd have to get rid the items first, e.g. via a .map(drop) or .map_ok(drop) respectively, to handle Stream/TryStream of a non-() item type.

2 Likes

ah right. thanks for the clarification.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.