I have a Tokio futures function that reads from a few Tokio Streams. I want to be able to combine all the streams into one stream, while still being able to detect if any of the streams has ended.
A simplified example of my code:
extern crate futures;
use futures::{Future, stream};
use futures::stream::Stream;
fn main() {
let my_stream1 = stream::iter_ok::<_,()>(0u32 .. 20);
let my_stream2 = stream::iter_ok::<_,()>(100u32 .. 110);
let combined_stream = my_stream1.select(my_stream2);
let stream_done = combined_stream
.for_each(|x| {
print!("{}, ",x);
Ok(())
});
stream_done.wait().unwrap();
println!("");
}
This will print the output:
0, 100, 1, 101, 2, 102, 3, 103, 4, 104, 5, 105, 6, 106, 7, 107, 8, 108, 9, 109, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
In my case, my_stream2 represents requests coming from a user. I want to be able to abort right when I find out that my_stream2
has ended. This probably means that the user has closed the connection to this component, and there is no point in keeping processing other events from my_stream1
.
I couldn't think of an elegant way to solve this problem. I prefer to have a solution that uses combinators, and hopefully doesn't use the lower level Futures poll() interface.
If you are interested to see the actual code where I am having this issue, you can see it here:
https://github.com/realcr/cswitch/blob/master/src/timer.rs#L85
Any ideas are appreciated!
real.