Take first non-Error result out of a Stream of Results in Rust

I have a Stream that generates Result's, I want to apply a try_filter_map() on these items and take the first item that does not result in an error.

Consider the following example: (Playground)

use futures::{pin_mut, prelude::*};
use tokio;

#[tokio::main]
async fn main() {
    let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)])
        .try_filter_map(|v| async move { if v == 1 { Err("Ohno!") } else { Ok(Some(v)) } })
        .inspect_err(|err| println!("Error {:?}", err))
        .filter_map(|r| future::ready(r.ok()));
    pin_mut!(s);
    let result = s.next().await;
    println!("{:?}", result);
}

When I run this code I receive the following error:

Error "Ohno!"
thread 'main' panicked at '`async fn` resumed after completion'

I am fairly new to Rust and could use some help to solve this issue.

Why are you using try_filter_map if you're not filtering anything? You can just use and_then:

    let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)])
        .and_then(|v| future::ready(if v == 1 { Err("Ohno!") } else { Ok(v) }))

Well I posted just a simplified example, in my code I am filtering out certain Results.
But thanks anyways, I'll make do with and_then for now and just return Error's if I want to ignore an Item.

Do you know why try_filter_map results in this panic?

Oh, if you need the try_filter_map replacing the async move with future::ready should work.

I don't think that code should panic, it might be a bug in futures.

Yeah this is a bug in try_filter_map that is triggered if you try to access the stream again after it has returned an error. The bug happens here.

Thanks for the quick replies! I'll report the issue to the github repository, since this does not seem to be the documented behaviour.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.