Select()-ing two Streams, figuring out if any stream has ended

I have a Tokio futures function that reads from a few Tokio Streams. I want to be able to combine all the streams into one stream, while still being able to detect if any of the streams has ended.

A simplified example of my code:

extern crate futures;

use futures::{Future, stream};
use futures::stream::Stream;

fn main() {
    let my_stream1 = stream::iter_ok::<_,()>(0u32 .. 20);
    let my_stream2 = stream::iter_ok::<_,()>(100u32 .. 110);

    let combined_stream = my_stream1.select(my_stream2);
    let stream_done = combined_stream
        .for_each(|x| {
            print!("{}, ",x);
            Ok(())
        });

    stream_done.wait().unwrap();
    println!("");
}

This will print the output:

0, 100, 1, 101, 2, 102, 3, 103, 4, 104, 5, 105, 6, 106, 7, 107, 8, 108, 9, 109, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,

In my case, my_stream2 represents requests coming from a user. I want to be able to abort right when I find out that my_stream2 has ended. This probably means that the user has closed the connection to this component, and there is no point in keeping processing other events from my_stream1.

I couldn't think of an elegant way to solve this problem. I prefer to have a solution that uses combinators, and hopefully doesn't use the lower level Futures poll() interface.

If you are interested to see the actual code where I am having this issue, you can see it here:
https://github.com/realcr/cswitch/blob/master/src/timer.rs#L85

Any ideas are appreciated!
real.

So you need a signal that my_stream2 has ended. One way to do that would be something like this. Basically, you wrap the yield values in an enum which can also signal completion; the completion is signaled by chaining a one-time stream to my_stream2. Then you add a take_while into the mix, and gate continuation on that value not being a stop signal. You can use your TimerEvent enum by adding a similar variant that signals completion.

By the way, I don't think you should avoid rolling your own Stream impl - sometimes that expresses the intent better than a mix of combinators.

3 Likes

Hey @vitalyd, this is a really cool solution. I think that I'm going to use it to solve this problem.
Great thanks for taking the time to write the code example!

1 Like

Hey, @vitalyd? In your solution on playground, you have:

enum Status {
    Value(u32),
    Stop,
}

...

(0u32..20).map(Status::Value)

I've now seen you use .map() a few times like this, with a bare path. It caught my eye the other day, but I think in that case the path was a function and I didn't look into it carefully enough.

It looks like a really useful construct, but what on earth is going on? AFAICT, I'd need to put a closure as an argument to map, or an FnMut reference (which I loosely guessed was what was going on previously).

This, however, has me intrigued:

  • how can Status::Value, an enum discriminant, be an FnMut ?
  • how does the u32 value get applied or passed inside the enum field?

I've clearly missed something very useful in terms of implicit syntax or trickery, could you please elaborate on how this works?

It's actually a function. An enum constructor that takes the u32 and wraps it in a Status::Value(u32).

2 Likes

To add a bit to @ryan’s answer ...

Tuple structs (and tuple enum variants) and unit types are also available as functions. You can think of Status::Value(u32) as fn(u32) -> Status. Then, function pointers implement all 3 closure traits - this is a lang feature. So fn(u32) -> Status is all of FnOnce(u32) -> Status, FnMut(u32) -> Status and Fn(u32) -> Status.

3 Likes

Thankyou. Clearly, something like this had to be going on. I sort-of knew the latter part about the closure traits, but had no idea that tuples were functions. Wizardry! :woman_mage:

Now I know to look, I'll go digging to find where this is documented. I haven't found anything yet in the obvious places; perhaps it's only in an RFC. I'll update here when I find it.

Using function pointers vs closures (specifically with map as the example) is covered here, which will be where my partial understanding came from.. Advanced Functions & Closures - The Rust Programming Language

That tuple struct/enums can be functions is mentioned here, but not really in a way that makes the above implication apparent (and seems like it only applies for if and match)... Types - The Rust Reference

still digging, but I'll have to continue later

I've just come to something of a revelation.

I did a little playing with this, and found it wasn't quite as useful as I had initially hoped.

enum Status {
    Value(u32),
    Pair(u32, u32),
}

let sv: Vec<Status> = (0u32..10).map(Status::Value).collect();
let sp: Vec<Status> = (0u32..10).zip(10u32..20).map(Status::Pair).collect();

sv works, but sp fails with an argument mis-match (2-tuple rather than a function taking 2 args).

But what I've just realised is that the reason tuple structs are functions is just a trick

let p = Status::Pair(1, 2);

we use that like syntax, in the same way we do for {} structs with named fields, but it's actually a function call because it's easier/better to make the item name be a constructor function than to try and disambiguate the syntax. Wow.

1 Like