Async-std streams and .fuse() called twice

I'm reading this example from async-std book:

use async_std::{
    io::{stdin, BufReader},
    net::{TcpStream, ToSocketAddrs},
    prelude::*,
    task,
};
use futures::{select, FutureExt};

// Some functions omitted

async fn try_run(addr: impl ToSocketAddrs) -> Result<()> {
    let stream = TcpStream::connect(addr).await?;
    let (reader, mut writer) = (&stream, &stream); 
    let mut lines_from_server = BufReader::new(reader).lines().fuse(); // .fuse() called here 1st time
    let mut lines_from_stdin = BufReader::new(stdin()).lines().fuse(); // .fuse() called here 1st time
    loop {
        select! {
            line = lines_from_server.next().fuse() => match line { // .fuse() called here 2nd time
                Some(line) => {
                    let line = line?;
                    println!("{}", line);
                },
                None => break,
            },
            line = lines_from_stdin.next().fuse() => match line { // .fuse() called here 2nd time
                Some(line) => {
                    let line = line?;
                    writer.write_all(line.as_bytes()).await?;
                    writer.write_all(b"\n").await?;
                }
                None => break,
            }
        }
    }
    Ok(())
}

The parts of the code that are confusing to me are this where streams are fused upon initialization:

    let mut lines_from_server = BufReader::new(reader).lines().fuse(); // .fuse() called here 1st time
    let mut lines_from_stdin = BufReader::new(stdin()).lines().fuse(); // .fuse() called here 1st time

and later here consuming the stream .fuse() is called again:

line = lines_from_server.next().fuse() => match line {} // .fuse() called here 2nd time
line = lines_from_stdin.next().fuse() => match line {} // .fuse() called here 2nd time

Why it is needed to call .fuse() twice like that, when creating the streams and when consuming?
Why .await is not used?
Why do we need to fuse the streams at all in this case, because when first None is found loop is exited and program ends?

.fuse() is actually called on different things and not twice on the same structure.

The first two .fuse() are called on streams and is generally a good practice to avoid the possibility to poll a completed stream again, even if, as you mentioned, this shouldn't happen since we exit the loop as soon as one stream is over. Since it is still a good practice and avoid bad cases to happen in the long term when these branches will be edited and may introduce that possibility.

The second two .fuse() are called on the Future returned by .next() and this is required by futures::select! macro. It only accept FusedFuture so this call is required here.

Considering your question about not calling .await. I do not know if you are referring to the futures in the select! branches or the returned value of select! so I'll explain for both cases:

  • we do not call .await on the futures in the different branches because it would mean that we will successively wait for each future to complete in order to give their result to the macro, preventing select! to do its job. We give it futures so it can poll them concurrently and returns the first one to complete.
  • we do not call .await on select! because it works as if it already awaits a future that resolves to the result of the first branch to complete.
1 Like

Thank you for the in depth detailed answer.
I didn't knew that features::select! macro requres FusedFuture.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.