When running a set of Tokio-based futures, I created a stream of them with futures::stream::futures_unordered()
and .for_each(),
figuring that I would get the following benefits:
-
Each future would be processed as soon as it resolved, rather than waiting for them all to finish, as with
future::join_all()
, which I tried out in some early test code. -
I would get the resolution of all futures, whether they succeeded or failed. The Futures are chains of network transactions and the whole chain is of type
Future<Item=String, Error=String>
. The code returns bothOk()
andErr()
values in various places as appropriate.
#1 above seems to be true, although I can't be sure because the transaction are fast. But #2 definitely is not true. In the failure case, the stream ends.
Main looks like:
fn main() {
let mut runtime = Runtime::new().unwrap(); let input_values: [&str; 3] = ["good value", "value that triggers error, "good value"]; match runtime.block_on(lazy(|| { futures::stream::futures_unordered(input_values.iter() .map(|value| get_future(value)))) .for_each(|resp| { println!("{}", resp); Ok(()) }) })) { Ok(()) => println!("done"), Err(e) => { println!("error: {:?}", e); }, }
}
One way to handle this is to have everything in the future chain return Ok (and change the future signature to Future<Item=String, Error=()>
. Maybe that's the best thing because logically speaking even serious errors shouldn't be fatal to the stream. But, are there other ways of handling this?
Also, I arrived at my usage of futures_unordered() by trial and error. Is it the best thing for this and did I use it correctly?
Thanks,
Chuck