Futures-rs: How to combine latest values of two streams?

I think this is a kind of combineLatest, using async_stream::stream! and tokio::select!:

use async_stream::stream;
use futures::future::ready;
use futures::pin_mut;
use futures::stream::StreamExt;
use std::time::Duration;
use tokio::select;
use tokio::sync::watch;
use tokio::time::interval;

#[tokio::main]
async fn main() {
    let count = interval(Duration::from_millis(100)).scan(0, |acc: &mut u64, _| {
        *acc += 1;
        ready(Some(*acc))
    });
    let (bc_tx, bc_rx) = watch::channel(420);
    let mut bc_rx2 = bc_rx.clone().take(10);
    tokio::spawn(count.for_each(move |x| {
        bc_tx.broadcast(x).unwrap();
        ready(())
    }));
    let mut double = bc_rx.map(|x| x * 2).take(4);
    let tuples = stream! {
        let mut a = None;
        let mut b = None;
        loop {
            select! {
                Some(next_a) = bc_rx2.next() => { a = next_a.into(); }
                Some(next_b) = double.next() => { b = next_b.into(); }
                else => break,
            }
            if let (Some(a), Some(b)) = (a, b) {
                yield (a, b)
            }
        }
    };
    pin_mut!(tuples);
    while let Some(x) = tuples.next().await {
        println!("{:?}", x);
    }
    println!("ZE END");
}

it's more flexible though, as you can choose to yield something even when one of the streams has never yielded a value yet, filter, etc. You aren't bound to just waiting for at least one from every input and then piping the tuple into a map or scan.

One more note:
In this way, the tuples stream only ends when both input streams ended.
If you wanted it to end as soon as one of the input streams ended, you'd have to do something like:

select! {
    next_a = bc_rx2.next() => { if let Some(next_a) = next_a {
        a = next_a.into();
    } else { break } }
    next_b = double.next() => { if let Some(next_b) = next_b {
        b = next_b.into();
    } else { break } }
}