What happens to the other futures when select! chooses a branch?

I want to make sure that I'm selecting on futures in a loop correctly. I understand from the select! documentation that I need to fuse and pin the futures.

But what happens to the futures that were not selected in that branch when it exited?

Here's some example code:

use async_std::stream::{interval, StreamExt};
use futures::{future::FutureExt, pin_mut, select};
use std::time::Duration;
#[async_std::main]
async fn main() {
    let mut stream_a = interval(Duration::from_secs_f32(0.2)).enumerate();
    let mut stream_b = interval(Duration::from_secs_f32(0.2)).enumerate();

    let mut a_results = vec![];
    let mut b_results = vec![];
    while a_results.len() < 10 {
        let fused_a = stream_a.next().fuse();
        let fused_b = stream_b.next().fuse();
        pin_mut!(fused_a, fused_b);

        select! {
            a = fused_a => {
                a_results.push(a.unwrap().0);
            }
            b = fused_b => {
                b_results.push(b.unwrap().0);
            }
        }
    }

    println!("{} {}", a_results.len(), b_results.len(),);
    for (i, j) in a_results.iter().enumerate() {
        assert_eq!(i, *j);
    }
    for (i, j) in b_results.iter().enumerate() {
        assert_eq!(i, *j);
    }
}

Inside the loop, I fuse and pin the futures, but my understanding is that only one of them gets executed. Let's say it's fused_a. My fear was that fused_b would be dropped when the next iteration of the loop happens, and two new values are assigned to fused_a and fused_b. I haven't had any version of this script fail, so I think that fear is unfounded, but I don't understand why. What is happening under the hood when I reassign the two fused_* futures in the next iteration?

1 Like

I think the future returned by next() will only take the values from the underlying stream if it is successfully polled to completion.

If that was the case, then this could make sense. select!() necessarily polls the futures in some order, and will return once any of them come up completed. So once, say, fused_a comes up completed, fused_b is left unpolled and is never polled to completion. Then it's dropped and it hasn't actually taken the value out of the stream.

If we look at the implementation of next(), it returns a Next. And Next::poll is just

    fn poll(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output> {
        self.stream.poll_next_unpin(cx)
    }

So I think this is what's happening. The Next future doesn't actually claim the next item from the stream until the stream actually produces it! So if it is dropped before being polled to completion, nothing is taken out of the stream.

1 Like

Yeah. You are dropping the next() future, but dropping it does nothing.

3 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.