Async Iterator (aka Stream) – Which crate to use?

There’s StreamExt::for_each among other methods for simplifying operating on stream items

use futures::stream::StreamExt;
self.some_immutable_method(/*…*/).for_each(|item| {
    /* … */
}).await;
self.some_mutable_method(/* … */);

Oh, that's nice. But I still have a problem with futures::stream::StreamExt::for_each, because I think I cannot work with the ? operator. My case is more like the following:

let mut stream = self.some_immutable_method(/* … */);
while let Some(result) = stream.next().await {
    let item = result?;
    /* … */
}
drop(stream);
self.some_mutable_method(/* … */);

There’s TryStreamExt::try_for_each, which should be usable here. Turning

let mut stream = self.some_immutable_method(/* … */);
while let Some(result) = stream.next().await {
    let item = result?;
    /* … */
}
drop(stream);

into

use futures::stream::TryStreamExt;
self.some_immutable_method(/*…*/).try_for_each(|item| {
    /* … */
    Ok(())
}).await?;

should probably work (haven’t tested it).

For some reason it doesn't work in my code yet (but probably just a mistake on my side when converting your example to my code, that is actually a bit more complex).

Also, in most places of my code, I have multiple occurrences of the ? operator (i.e. not just on the items of the stream but also in subsequent I/O operations that I execute for each item). Thus try_for_each won't work there in either case :frowning:

It's more like this:

let mut stream = self.some_immutable_method(/* … */);
while let Some(result) = stream.next().await {
    let item = result?;
    do_something(item)?;
}
drop(stream);
self.some_mutable_method(/* … */);

So anything that involves a closure is difficult (unless the closure can also returns a Result).

And I just noticed, it actually does!

So this is what works:

use futures::stream::TryStreamExt;
foo().try_for_each(|item| {
    /* … */
    futures::future::ready(Ok(()))
}).await?;

Not sure how to use the ? operator here though, as the closure must return a future for a Result, and not directly a Result. Maybe something like:

use futures::stream::TryStreamExt;
foo().try_for_each(|item| {
    futures::future::ready({
        /* … */
    })
}).await?;

But then I still cannot use .await inside the loop.

I'll look more into that later, but even if your proposal would work, it's still two and a half lines of code.

No, it can work. Notice that the closure returns a Result as-well (see the Ok(()) return value), so you can use ? inside of it.

Edit: I should’ve continued reading the rest of your post first…

Ah, maybe try something like

.try_for_each(|item| {
    async move {
        /* ... */ // can use .await and ? in here
        Ok(())
    }
})

Sorry for not telling you my later findings first in the post :wink:

I tried that too. Due to the other things happening in my loop, I now get a couple of errors like

  • error[E0507]: cannot move out of foo, a captured variable in an FnMut closure
  • error[E0382]: use of moved value: bar

Besides, the async move { … } adds another line (and a half) to the code. And the line count is what I actually wanted to reduce. :sweat_smile:

I’d assume it gets formatted as

.try_for_each(|item| async move {
    /* ... */ // can use .await and ? in here
    Ok(())
})

anyways.

right… maybe leave out the move xD – I don’t know why exactly I placed it there

Edit: I guess my assumption was that I’d like to make sure that item is captured by-value. It’s hard to write/debug Rust code through a forum. It might be hard to not use the async move in case item implements Copy; if it doesn’t, it might be enough to make sure that item is somehow actually consumed inside of the async {} block.

My code isn't public (yet), and boiling it down to a minimal example is not as easy right now. Anyway, thanks for your help so far. Leaving out the move doesn't work either btw., but I'll keep try_for_each in mind.

I'm generally not so fond of iteration using closures (also in non-async code): return changes its meaning, issues with "?" etc. Maybe I should be more open to it. I'll definitely keep try_for_each in mind, though.

I thought since Rust doesn't support the for … in syntax for streams (yet?), there might be some ready-to-use macro around to achieve the same (allowing usage of return and ? as in any other for-loop). Maybe I should try implementing one as an exercise.

The tokio-stream docs propose to use while let Some(value) = stream.next().await to iterate over a stream. But in their example, the drop isn't necessary (and not mentioned).

I just played around a bit and came up with this:

use futures;
macro_rules! for_stream_item {
    ( $a:ident in $b:expr, $c:block ) => {
        {
            let mut stream_ = $b;
            while let Some($a) =
                $crate::futures::stream::StreamExt::next(&mut stream_).await
                $c
        }
    }
}

To be used as follows:

for_stream_item!(my_item in some_stream_returning_expression(), {
    my_item.foo()
})

Is that sound? I don't like the comma, but didn't find a way to get rid of it.

P.S.: Maybe a pin_mut!(stream_); should be added.