How to handle errors in Stream

When working with a Stream from the futures crate what is the best way to handle errors and filter them out of the streams?

The only way I can think to do that is with something like:

stream.then(|item| match item {
  Ok(it) => Ok(Some(it)),
  Err(err) => {
    eprintln!("Got an error: {}", err); // or other error handling
    Ok(None)
  }
}).filter_map(|x| x) 

But this is very awkward. Is there a better way to do this?

Also the example I gave above doesn't compile, because the compiler can't infer the error type of the Result, and I'm not sure how to supply that.

The compiler throws an error because you're returning a Result<Option<T>, E> and it can only infer the type of T, not E. Unless I'm misunderstanding your question, if you wanna ignore the errors, you can use filter_map directly.

use futures::Stream;
use futures::sync::mpsc;

fn main() {
    let (_tx, rx) = mpsc::channel::<Result<usize, ()>>(1);
    rx.filter_map(|v| {
        if let Err(e) = v {
            eprintln!("{:?}", e);
        }

        v.ok()
    });
}

sorry I should have mentioned I'm using futures 0.1 (since 0.3 is unstable and 0.2 has been yanked). So filter_map only filters over Ok values, not errors.