Futures: how to inspect and drop errors from a stream?


#1

I’m looking for some help on working with futures, specifically on how to handle error elements in a stream.

I have a Stream where occasional errors are possible and can be just inspected and dropped from the rest of the chain. In such case, how can I inspect/filter/skip an Err element?

A minimal example would be the following code (playground here):

extern crate futures; // 0.2.1
extern crate futures_executor; // 0.2.1
use futures::prelude::*;

fn main() {
    type ElemKind = Result<u8, u8>;
    let elems: Vec<ElemKind> = vec![Ok(1), Err(2), Ok(3)];
    let stream = futures::stream::iter_result(elems)
        .inspect_err(|e| {
            if *e == 2 {
                println!("Element to filter out of the stream: {}", e)
            }
        })
        /*
        Which combinator to inspect and filter out specific errors
        from a stream here?
        */
    ;

    let items: Vec<u8> = futures_executor::block_on(stream.collect()).unwrap();
    assert_eq!(items, vec![1, 3])
}

I think I am looking for something like Stream::skip_while, but that only works on non-Err items; is there anything similar but taking a Result instead?


#2

I think you are after something like or_else or recover.

These methods force you to inject a Self::Item back into the stream, though, so you would then need something dirty like making your item type Option<T> and then flattening the options. Maybe there is a cleaner way somewhere, or maybe this is a PR waiting to happen for a filter_err() stream method.


#3

As @HadrienG alluded to, the typical way is to use then() to change the stream to be Stream<Item=Option<T>, Error=()>, followed by filter_map() to unwrap the Option back to just T and essentially dropping the error values, yielding a Stream<Item=T, Error=()> afterwards.

example


#4

Thanks both! I guess I was missing the key idea of a roundtrip through the a Result<->Option transposition. This solves my usecase pretty well.