Futures-rs: How to combine latest values of two streams?


#1

How can I implement the equivalent of Rx combineLatest with futures-rs streams? (http://reactivex.io/documentation/operators/combinelatest.html)

I’d like to get a stream of pairs with the current value of either stream, with the latest value from the other stream. The first value would be emitted only after having one value from each stream.


#2

I believe this would be served by Stream::zip(), which creates a new stream type yielding pairs. It does require that the two streams have the same Error type, but you can map the error of one stream to the other’s using Stream::map_err().


#3

@DroidLogician I think zip() always needs to have one item from each stream, then pairs the corresponding items, like a… zipper :slight_smile:

“The zipped stream waits for both streams to produce an item, and then returns that pair.”


#4

The link in the first post explains the differences to zip() very well.

IMO such a stream is not a particularly good fit for futures-rs, because the same item can be returned multiple times.
That means that the item must implement Clone.
Additionally, since you don’t know that beforehand, you actually have to clone() every single item returned by both streams. This is only reasonable for cheap clone()s.

But it shouldn’t be too difficult to implement it anyway.