Futures-rs: How to combine latest values of two streams?

How can I implement the equivalent of Rx combineLatest with futures-rs streams? (http://reactivex.io/documentation/operators/combinelatest.html)

I’d like to get a stream of pairs with the current value of either stream, with the latest value from the other stream. The first value would be emitted only after having one value from each stream.

I believe this would be served by Stream::zip(), which creates a new stream type yielding pairs. It does require that the two streams have the same Error type, but you can map the error of one stream to the other’s using Stream::map_err().

@DroidLogician I think zip() always needs to have one item from each stream, then pairs the corresponding items, like a… zipper :slight_smile:

“The zipped stream waits for both streams to produce an item, and then returns that pair.”

The link in the first post explains the differences to zip() very well.

IMO such a stream is not a particularly good fit for futures-rs, because the same item can be returned multiple times.
That means that the item must implement Clone.
Additionally, since you don’t know that beforehand, you actually have to clone() every single item returned by both streams. This is only reasonable for cheap clone()s.

But it shouldn’t be too difficult to implement it anyway.

I think this is a kind of combineLatest, using async_stream::stream! and tokio::select!:

use async_stream::stream;
use futures::future::ready;
use futures::pin_mut;
use futures::stream::StreamExt;
use std::time::Duration;
use tokio::select;
use tokio::sync::watch;
use tokio::time::interval;

#[tokio::main]
async fn main() {
    let count = interval(Duration::from_millis(100)).scan(0, |acc: &mut u64, _| {
        *acc += 1;
        ready(Some(*acc))
    });
    let (bc_tx, bc_rx) = watch::channel(420);
    let mut bc_rx2 = bc_rx.clone().take(10);
    tokio::spawn(count.for_each(move |x| {
        bc_tx.broadcast(x).unwrap();
        ready(())
    }));
    let mut double = bc_rx.map(|x| x * 2).take(4);
    let tuples = stream! {
        let mut a = None;
        let mut b = None;
        loop {
            select! {
                Some(next_a) = bc_rx2.next() => { a = next_a.into(); }
                Some(next_b) = double.next() => { b = next_b.into(); }
                else => break,
            }
            if let (Some(a), Some(b)) = (a, b) {
                yield (a, b)
            }
        }
    };
    pin_mut!(tuples);
    while let Some(x) = tuples.next().await {
        println!("{:?}", x);
    }
    println!("ZE END");
}

it's more flexible though, as you can choose to yield something even when one of the streams has never yielded a value yet, filter, etc. You aren't bound to just waiting for at least one from every input and then piping the tuple into a map or scan.

One more note:
In this way, the tuples stream only ends when both input streams ended.
If you wanted it to end as soon as one of the input streams ended, you'd have to do something like:

select! {
    next_a = bc_rx2.next() => { if let Some(next_a) = next_a {
        a = next_a.into();
    } else { break } }
    next_b = double.next() => { if let Some(next_b) = next_b {
        b = next_b.into();
    } else { break } }
}