Using question mark in StreamExt::for_each

I'm playing with async Rust. How can I use the question mark in StreamExt::for_each?

Below is a small example modified from the documentation.

    use std::error::Error;
    use std::num::ParseIntError;
    use futures::future;
    use futures::stream::{self, StreamExt};

    fn transform(s: &str) -> Result<i32, ParseIntError>{
        s.parse()
    }

    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error>> {
        let fut = stream::repeat("abc").take(3).for_each(|x| {
            transform(x)?          // NOT WORK

            future::ready(())
        });
        fut.await;

        Ok(())
    }

I think I could understand that the document says F should return Future<Output = ()>. But in this case how can one use the question mark?

Or is there another idiomatic way to handle error inside the closure?

Thanks.

You can only use the ? operator in functions and closures that return Result or Option. The closure passed to StreamExt::for_each does not (it returns a future that resolves to ()). Even if it did, it wouldn't exit the outer function (i.e. main in your case) propagating the error, but only the closure that is being executed for each element of the stream.

You could handle streams containing fallible operations with the TryStreamExt trait. TryStreamExt::try_collect for example tries to collect all the items in a stream that contains elements of type Result<T, E>:

use futures::stream::{self, StreamExt, TryStreamExt};
use std::error::Error;
use std::num::ParseIntError;

fn transform(s: &str) -> Result<i32, ParseIntError> {
    s.parse()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let fut: Result<Vec<i32>, ParseIntError> = stream::repeat("abc")
        .take(3)
        .map(|x| transform(x))
        .try_collect()
        .await;

    fut?;

    Ok(())
}

Playground.

Or probably even easier, you can use a plain old while-loop. It doesn't involve closures at all and allows you to use ? to short-circuit your main function:

use futures::stream::{self, StreamExt};
use std::error::Error;
use std::num::ParseIntError;

fn transform(s: &str) -> Result<i32, ParseIntError> {
    s.parse()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    while let Some(x) = stream::repeat("abc").take(3).next().await {
        transform(x)?;
    }

    Ok(())
}

Playground.

2 Likes

That's a key point. You reply helps a lot, thank you!

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.