Async Stream with Item = Result<_>

Trying to get the following code to compile:

use std::error::Error;
use futures::stream::{self, Stream, StreamExt};
use std::collections::HashSet;

async fn foo() -> impl Stream<Item = Result<(), Box<dyn Error>>> {
    stream::iter(vec![]).then(move |val|
        async move {
            // In reality this is a function call that returns a Result
            let v = val?;
            // In reality this is a function call that returns a HashSet for each `v`
            let values = HashSet::new();
            // I guess I need to keep the Ok() because I use `?` within this future
            Ok(values.into_iter())
        }
    )
}

I get the following error:

   |
12 | async fn foo() -> impl Stream<Item = Result<(), Box<dyn Error>>> {
   |                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected `()`, found struct `std::collections::hash_set::IntoIter`
   |
   = note: expected enum `Result<(), Box<(dyn std::error::Error + 'static)>>`
              found enum `Result<std::collections::hash_set::IntoIter<_>, _>`

Need to apply some mapper to make a Stream out of the HashSet I guess, but how?

You want to make a stream out of the hash sets? Maybe you need to use flatten or something like that? It's hard to understand your problem with the code you posted, because it's very simplified. In particular, I don't quite understand if the () type is what you want... are there no items (besides errors)? What kind of type does the hashset contain and what exactly do you want do do with it?

Exactly, I want a stream out of the hash set. The real item type is something like a Request object, so the function would return a Stream<Item=Request> where the individual Request objects would be created within the async move { ... }.

I did try flatten, but didn't get far.

Here's a slightly more sophisticated version. Maybe that makes it a little clearer:

use std::error::Error;
use futures::stream::{self, Stream, StreamExt};
use std::collections::HashSet;

type MyResult<T> = Result<T, Box<dyn Error>>;

// My input type
#[derive(Hash, PartialEq, Eq)]
struct Url;
// My output type
struct Request;


async fn foo(urls: Vec<MyResult<Url>>) -> impl Stream<Item = MyResult<Request>> {
    stream::iter(urls).then(move |url|
        async move {
            // In reality this is a function call that returns a Result
            let u = url?;
            // In reality this is a function call that returns a HashSet for each `v`
            let requests = HashSet::from_iter(vec![u]);
            // I guess I need to keep the Ok() because I use `?` within this future
            Ok(requests.into_iter())
        }
    ).flatten()
}

Playground

It can be fixed like this: Rust Playground . Since you're working with results, you would need try_flatten instead of flatten. try_flatten will propagate errors properly. But try_flatten requires the stream's item to be a stream of results as well, so we need map(Ok) to convert an iterator of values into an iterator of Results and stream::iter to convert that to a stream.

I think it's better to use stream_generator or async_stream for this case. It would make the code more readable.

2 Likes

Wow, that's extremely helpful. Thanks for the impressive fix @Riateche. I'll also look into the other crates you mentioned.
Also thanks to @steffahn for the support. I think we can close the case. :blush:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.