How to understand FusedFuture

Recently, I am reading source code in the repo parity-bridges-common. There are some rs files full of incomprehensibly asynchronous syntax that I am not familiar with. Especially about FusedFuture. Example file:

  1. sync_loop.rs
  2. message_loop.rs

According to the futures::future::fuse, what I understand is that we can transfer a Future into FusedFuture and then poll it, again and again, for many times. Besides this, I find a function named terminated() in FuseFurture and example below.

Creates a new Fuse-wrapped future which is already terminated.
This can be useful in combination with looping and the select! macro, which bypasses terminated futures.

let (sender, mut stream) = mpsc::unbounded();

// Send a few messages into the stream
sender.unbounded_send(()).unwrap();
sender.unbounded_send(()).unwrap();
drop(sender);

// Use `Fuse::terminated()` to create an already-terminated future
// which may be instantiated later.
// bear: We must terminated() here in order to use select!?
let foo_printer = Fuse::terminated(); 
pin_mut!(foo_printer);

loop {
    select! {
        _ = foo_printer => {},
        () = stream.select_next_some() => {
            if !foo_printer.is_terminated() {
                println!("Foo is already being printed!");
            } else {
                // bear: here we reset the foo_printer pointed value?
                foo_printer.set(async {
                    // do some other async operations
                    println!("Printing foo from `foo_printer` future");
                }.fuse());
            }
        },
        complete => break, // `foo_printer` is terminated and the stream is done
    }
}

I cannot understand when to use this function and how to combine it with select!. Can anyone help me understand it more easily? Or are there better docs or examples about this usage?

Some posts i found, maybe useful:

1 Like

Good question indeed :slightly_smiling_face:

Another interesting post: https://github.com/tokio-rs/tokio/pull/2152#issue-365671263

Several things:

  • The Future API / contract of its .poll() method states that a Future that yielded Poll::Ready(_) must no longer be polled. Futures are thus allowed to panic or perform other logic bugs (but not memory unsafety!) if this condition is violated. This is a legitimate API restriction since most futures are polled in some form of loop / event loop until completion, and at that point the loop is "broken" (interrupted) or the future is removed from the event loop, hence effectively not polling it anymore.

  • Alas, some more complex scenarios may run into a situation where they may have to poll a Future, and they "don't / can't remember" if this Future already yielded Ready. That's a pickle :grimacing: Well, the solution is easy: just keep some boolean flag next to the Future, and before polling it, check the flag, and on ready, unset it. To make sure we don't forget to do that, let's bundle the Future and the flag within a wrapping type, which will, implement Future (so that every .poll() call goes through it, to give it a chance to update the flag), while also offering a special API to check for the status of the flag "is the future terminated?": futures::future::FusedFuture - Rust

    That's a Fused / .fuse()d Future, and when you think about it, besides some Future-related technicalities, it does look a lot like an Option<Future>:

    • The terminated flag is basically an .is_none(), and polling the inner future will, when it yields Ready, .take() / remove "it" so that the FusedFuture becomes vacant / terminated / none.
  • At that point you may now guess what is going on with that weird code: foo_printer is a Fuse<SomeAsyncFut>, i.e., a kind of Option<SomeAsyncFut>, which is initialized later: it begins "empty":

    In the same fahsion that we sometimes write:

    let mut thing = None;
    while … {
        if let Some(…) = thing {
           …
        } else if … {
            thing = Some(…);
       } …
    }
    

    In your example they wrote:

    foo_printer = Fuse::terminated(); /* thing = None */
    

    , and later on, they wrote:

    foo_printer.set(async { … }.fuse()) /* thing = Some(async { … }); */
    

Finally, as to why they used this convoluted Option, rather than a good old Option, the reason is that futures::select! is a very basic / primitive macro, that only supports polling (fused) futures, with the only possible "guard" for skipping branches being this .terminated().not() / .is_some() flag. So I would qualify this pattern as a hacky workaround to palliate a limitation of the macro.

There are smarted macros out there, such as tokio's, that do support classic boolean guard and pattern matching, thus enabling a direct use of Options :slightly_smiling_face:

(Another way to put it, is that futures::select! macro unsugars to checking that each future it has been given has not been terminated, and it turns out that it is the only way to skip a branch inside such a primitive macro / match. So, if you want to have an equivalent of Options + Some(…) => branches inside it, you can hack them with a fake terminated() (fused-)future, that gets initialized (.set) afterwards.)

10 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.