How to handle errors in Stream

#1

When working with a Stream from the futures crate what is the best way to handle errors and filter them out of the streams?

The only way I can think to do that is with something like:

stream.then(|item| match item {
  Ok(it) => Ok(Some(it)),
  Err(err) => {
    eprintln!("Got an error: {}", err); // or other error handling
    Ok(None)
  }
}).filter_map(|x| x) 

But this is very awkward. Is there a better way to do this?

#2

Also the example I gave above doesn’t compile, because the compiler can’t infer the error type of the Result, and I’m not sure how to supply that.

#3

The compiler throws an error because you’re returning a Result<Option<T>, E> and it can only infer the type of T, not E. Unless I’m misunderstanding your question, if you wanna ignore the errors, you can use filter_map directly.

use futures::Stream;
use futures::sync::mpsc;

fn main() {
    let (_tx, rx) = mpsc::channel::<Result<usize, ()>>(1);
    rx.filter_map(|v| {
        if let Err(e) = v {
            eprintln!("{:?}", e);
        }

        v.ok()
    });
}
#4

sorry I should have mentioned I’m using futures 0.1 (since 0.3 is unstable and 0.2 has been yanked). So filter_map only filters over Ok values, not errors.