Why doesn't tokio::select! require FusedFuture?

tokio::select! is quite different from futures::select!. One notable difference is that it doesn't require the polled futures to implement FusedFuture. This allows code like this to compile:

use tokio::{select, time::{interval, delay_for}};
use std::time::Duration;

async fn some_action() -> u64 {
    delay_for(Duration::from_secs(4)).await;
    42
}

#[tokio::main]
async fn main() {
    let mut interval = interval(Duration::from_secs(1));
    let mut some_action = Box::pin(some_action());
    
    loop {
        select! {
            result = &mut some_action => {
                println!("got result: {:?}", result);
            }
            _ = interval.tick() => {
                println!("tick!");
            }
        }
    }
}

This code panics at runtime: (playground)

thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:4:31

The code is buggy, and it may not be apparent. Some code with a similar bug could only panic in rare cases like timeouts, so covering it with tests is really hard.

The futures::select! macro requires FusedFuture, so this whole class of bugs is checked at compile time. The compiler will force you to fuse everything, and no panic will occur.

It's unclear how I could use extra features of tokio::select! to avoid the bug. I can't use a pattern match, since the future just returns a plain value. I can't use an arm guard because I don't have any suitable variables. I could introduce a variable and track the status of the future in it, but that's like implementing .fuse() in a very inconvenient, error-prone way. I could just fuse everything, but then I could just use futures::select! which checks that at compile time.

I'd like to discuss an example provided in the docs. The text above the example says:

Collect the contents of two streams. In this example, we rely on pattern matching and the fact that stream::iter is "fused", i.e. once the stream is complete, all calls to next() return None .

Note the implicit notion of the fact that stream::iter is fused. If it is replaced with a non-fused stream, it would still be polled after returning Ready(None) and cause bad behavior at runtime (playground). The change of the stream implementation could happen in another place, where the connection to select! invokation is not apparent, causing a bug that can't be detected locally. More than that, there is no apparent way to fix that example to work with non-fused streams. Pattern matching is already used and doesn't help, and tracking the state of each stream with extra variables is, again, inconvenient and error-prone. If you can't really use tokio::select! on non-fused streams, the lack of a FusedFuture trait bound doesn't really provide any benefits.

The description of the pull request that added tokio::select! does not answer my questions. I don't understand the rationale behind that. Yes, fusing futures is inconvenient, but it's necessary if you want to safely poll them in an uncontrolled manner, like in a select! macro. I could see tokio::select! being necessary in some rare cases where you really can't afford to fuse a future, but providing it as the default general purpose implementation of select! seems like a massive footgun to me.

If I had to guess, it’s to handle streams that can return values after a None which is explicitly stated as a goal of the tokio Stream trait as compared to the futures Stream trait

Where is it stated? Tokio's Stream trait is just a reexport of futures's Stream trait, so it can't have different semantics. It doesn't even have a separate documentation in tokio.

Sorry I misread the docs. You’re right. I think what I was reading that led me to think that was the case recently was the docs for DelayQueue in tokio which does have that behavior where it can return values after None. It might be used in other places too. Which may be why select is set up the way it is?

Not only is the FusedFuture trait super annoying to work with, it also obfuscates the branch-disable functionality, and makes it really hard to disable branches in any case not natively supported by the flag in the fused future.

As for fused streams, the select! from futures is not perfect because if you have a non-fused stream, you can try to fuse the .next() future, which will still fail and cause a panic on runtime if the stream is exhausted. With Tokio's macro, you are supposed to just fuse the stream if you are polling multiple streams in a loop like that (or use something like StreamExt::merge instead of select!). Note that we cover the loop case in the Tokio tutorial.

Another point is that the majority of uses of select! don't use the &mut my_future functionality, but rather cancel all other branches when a branch completes. A large number of the cases with &mut my_future are better written using join! or try_join!.

Finally, we generally try to only use traits we believe have a clear path into the standard library. This is the case for Stream, but not for Sink, which is also missing in Tokio. Similarly it will be a long time, if ever, that FusedFuture gets moved into std, because it's noone's priority to do so.

Thanks for the detailed response @alice.

Can you provide some examples where it's super annoying? My experience generally is that you just put .fuse() in some places and then it works.

Can you explain what that means? With FusedFuture, you can always tell if the branch is disabled using is_terminated().

That seems orthogonal to the fused issue. Adding a way to explicitly disable a branch of select! is nice, but my question was not about that.

That's definitely a problem of both tokio::select! and futures::select!. I don't see how the lack of FusedFuture bound helps with it, though. (I guess one could argue that futures::select! gives a false sense of security.)

In my practice, the majority of uses of select! is in loops or in functions that are called repeatedly. A stream produces multiple values, and I need to select! multiple times to get multiple values. Is there any common case of using select! a single time on streams?

That doesn't seem true to me. I'd say one of the main cases is to continuously do some work based on multiple inputs (streams and futures of different types).

The FusedFuture trait has everything to do with disabling branches — it's literally the only thing it does. Any futures::select! is equivalent to a tokio::select! where every single branch has an , if !future.is_terminated() guard. There's no analogue of Tokio's pattern matching in future::select!. As for obfuscating the branch-disable feature, what I mean is that it is completely invisible in the code that branches get disabled. I suppose you could add several ways of disabling branches to make it easier to explicitly disable branches when you want to do that.

You asked for some examples, so let's go through every select! in mini-redis, which is a larger example of what idiomatic async code using Tokio looks like.

The first place it is used illustrates how to implement shutdown. The code is here.

tokio::select! {
    res = server.run() => {
        if let Err(err) = res {
            error!(cause = %err, "failed to accept");
        }
    }
    _ = shutdown => {
        // The shutdown signal has been received.
        info!("shutting down");
    }
}

This is, in my experience, the most common use of select!. With FusedFuture, you have to fuse every branch, even though no branch needs to be fused.

You find the next example here.

// this is inside a loop
select! {
    // Receive messages from subscribed channels
    Some((channel_name, msg)) = subscriptions.next() => {
        use tokio::sync::broadcast::RecvError;

        let msg = match msg {
            Ok(msg) => msg,
            Err(RecvError::Lagged(_)) => continue,
            Err(RecvError::Closed) => unreachable!(),
        };

        dst.write_frame(&make_message_frame(channel_name, msg)).await?;
    }
    res = dst.read_frame() => {
        let frame = match res? {
            Some(frame) => frame,
            // This happens if the remote client has disconnected.
            None => return Ok(())
        };

        handle_command(
            frame,
            &mut self.channels,
            &mut subscriptions,
            dst,
        ).await?;
    }
    _ = shutdown.recv() => {
        return Ok(());
    }
};

Only the first branch could benefit from the FusedFuture trait, since the StreamMap type could implement FusedStream if Tokio used these traits. That said, without the pattern match, you would have to handle the None case inside the branch with a call to continue, and if you do have the pattern match, then FusedStream doesn't add anything functionality-wise, even if it does help with compile-time checking.

The two other branches would require an unnecessary .fuse(). Even if dst or shutdown knew whether they had completed, the methods are implemented as an async fn, which is incompatible with taking advantage of FusedFuture.

The third and final example. This is a task that prunes expired keys from a key-value store. Code is found here.

while !shared.is_shutdown() {
    if let Some(when) = shared.purge_expired_keys() {
        tokio::select! {
            _ = time::delay_until(when) => {}
            _ = shared.background_task.notified() => {}
        }
    } else {
        // There are no keys expiring in the future. Wait until the task is
        // notified.
        shared.background_task.notified().await;
    }
}

Neither branch has any need for fusing. It is possible that Delay could implement FusedFuture and thus not require a .fuse(), but .notified() certainly would require a .fuse(), as it is implemented as an async fn, which is incompatible with FusedFuture.

These examples have seven branches in total, and FusedFuture is only somewhat useful in one out of those seven branches, and even in the branch where it could help, you still also need the pattern match since we don't care about the stream returning None.

The point is that we are just really tired of putting .fuse() on nearly every single branch in every single select! we write, even though most of them have no use for it whatsoever. It's especially bad because FusedFuture is completely incompatible with anything implemented using async/await, which quite a lot of stuff is. Translating it to manual futures is not only error-prone, but clutters the documentation with future types, and makes the documentation less readable, as the method isn't marked as an async fn anymore.

Sure, but even when using select! in a loop (as we have two examples of above), that certainly does not require you to use &mut my_future on any of your branches. Usually your branch calls something like .next() or .recv() or .read() or .write() or .notified(), and with none of these do you need to keep a future alive from iteration to iteration. Additionally only one of these could implement FusedFuture unless you wanted to go away from async fn.

Besides the ability to cancel stuff (which is less relevant when used in loops), the primary advantage of select! over join! or try_join! is that you can have a resource that your branches use in some manner, but obtain mutable access to that resource when a branch completes until the next loop iteration. If you keep a future alive with &mut my_future, this advantage mostly goes away, as the future would still borrow the resource when the branches complete.

Your example of using &mut some_action would probably be better written like this, assuming you wanted to stop ticking when some_action() elapsed.

use tokio::{select, time::{interval, delay_for}};
use std::time::Duration;

async fn some_action() -> u64 {
    delay_for(Duration::from_secs(1)).await;
    42
}

async fn ticking() {
    let mut interval = interval(Duration::from_millis(100));
    loop {
        interval.tick().await;
        println!("tick!");
    }
}

#[tokio::main]
async fn main() {
    select! {
        result = some_action() => {
            println!("got result: {:?}", result);
        },
        _ = ticking() => unreachable!(),
    }
}

playground

1 Like

I would use futures::future::abortable and tokio::time::timeout instead of select! in these cases. That would make the code more concise and readable.

I agree. This is indeed a case where FusedFuture is not suitable.

For me as a user, the lack of fused traits in the API is a problem: if I want to select! repeatedly, I need to know if StreamMap and dst are internally fused. Without exposing the trait on StreamMap, my only choice is to read the source (I don't see a note in its docs about fused behavior). I agree that for async fn methods there is no clear solution to this problem, and solving it just for some cases is not that valuable. It makes sense to avoid FusedFuture and look for a solution that covers async fn methods as well. It's unfortunate that the current situation leaves me without tools for writing code like this in a robust way.

I'm new to all this async and tokio business so forgive me not understanding the issues here.

I have just been through an exercise to convert and extend some of my existing code from std threads to tokio tasks. Not that I'm not likely to need the capability to jungle a million client connections performantly on one machine that async is supposed to make possible.

No, this was all a learning exercise but in part driven by that select! macro. Which allows me to effectively join a bunch of tasks at the same time and catch when any one of them fails and terminates. If they do it is game over for the entire program. I never figured out how to do that nicely with std threads.

So far it has all worked out really well and that particular aspect was an easy part of the exercise.

There was mention of foot-guns above. I seems to me that working in async offers many other foot guns already. I have been scratching my head a few times wondering why my code does not seem to even run when it all compiles cleanly. Sorry I can't give an example just now. So I'm not inclined to worry about select!. On the contrary, it's understandable, easy to use and works as advertised.

The abortable combinator can't replace shutdown generally. Mini-redis uses a broadcast channel because that allows it to kill many tasks with a single signal.

That’s what select_next_some is for.

Huh, that's a pretty neat way to do it.

Since I'm the one who initiated the work on removing pin and FusedFuture from select!:

Before we did this change, we had multiple questions per week on a variety of channels on what that pin_mut! stuff is, and what the weird error messages around FusedFuture where. A lot of people don't select in loops, they just use things for timeouts or cancellation. The new version massively simplifies that scenario.

And even if you had the old version and used .fused - it's not a great help. E.g.

let mut interval = interval(Duration::from_secs(1));    
    loop {
        select! {
            result = some_action().fused() => {
                println!("got result: {:?}", result);
            }
            _ = interval.tick().fused() => {
                println!("tick!");
            }
        }
    }

is still broken, since every loop iteration will yield a new Future. In all cases you have to understand whether to act on new or old futures, and whether the future already had completed. Once you are there you can reuse FusedFuture with tokio:

let mut interval = interval(Duration::from_secs(1));
let mut some_action = Box::pin(some_action().fused());
    
    loop {
        select! {
            result = &mut some_action if !some_action.is_terminated() => {
                println!("got result: {:?}", result);
            }
            _ = interval.tick() => {
                println!("tick!");
            }
        }
    }

But you don't have to! You can just use a boolean to do the same - and don't have to learn about or import FusedFuture:

let mut interval = interval(Duration::from_secs(1));
let mut some_action = Box::pin(some_action());
let mut action_completed = false;
    
    loop {
        select! {
            result = &mut some_action if !action_completed => {
                println!("got result: {:?}", result);
                action_completed = true;
            }
            _ = interval.tick() => {
                println!("tick!");
            }
        }
    }

In addition to making things easier this also helped to remove the dependency of the whole ecosystem on FusedFuture, which so far has no path to standardization.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.