Streams of futures and failures


#1

When running a set of Tokio-based futures, I created a stream of them with futures::stream::futures_unordered() and .for_each(), figuring that I would get the following benefits:

  1. Each future would be processed as soon as it resolved, rather than waiting for them all to finish, as with future::join_all(), which I tried out in some early test code.

  2. I would get the resolution of all futures, whether they succeeded or failed. The Futures are chains of network transactions and the whole chain is of type Future<Item=String, Error=String>. The code returns both Ok() and Err() values in various places as appropriate.

#1 above seems to be true, although I can’t be sure because the transaction are fast. But #2 definitely is not true. In the failure case, the stream ends.

Main looks like:

fn main() {

let mut runtime = Runtime::new().unwrap();
let input_values: [&str; 3] = ["good value", "value that triggers error, "good value"];

match runtime.block_on(lazy(|| {
    futures::stream::futures_unordered(input_values.iter()
                                       .map(|value| get_future(value))))
        .for_each(|resp| {
            println!("{}", resp);
            Ok(())
        })
})) {
    Ok(()) => println!("done"),
    Err(e) => { println!("error: {:?}", e); },
}

}

One way to handle this is to have everything in the future chain return Ok (and change the future signature to Future<Item=String, Error=()>. Maybe that’s the best thing because logically speaking even serious errors shouldn’t be fatal to the stream. But, are there other ways of handling this?

Also, I arrived at my usage of futures_unordered() by trial and error. Is it the best thing for this and did I use it correctly?

Thanks,
Chuck


#2

After a bit of consideration, I found a reasonable way forward. The result values in the future chain (the various Ok()s and Err()s) control how (and how far) the chain proceed. So far it has the desired behavior, and it doesn’t make sense to perturb logic that already works. A good solution is to have an or_else invocation at the very end that transforms any propagated Err to an Ok so that the chain can run to completion. The thing that processes the chain can decide what to do with the answer. Right now everything is a String, but that will change to a struct that is more easily evaluated.