Select streams and timeout on one of them

Hi, I'm (unsuccessfully) trying to build the following setup in tokio:

  • Create a single stream out of two (I'm using the select function).
  • Timeout on only one of them.

As far as I've seen I can only timeout on a future, but the future I get from calling next() is with both streams already joined.

The reason for doing this setup is that the stream that must timeout is used for media. Media might not be flowing at some points, but I want a minimum amount of activity to detect possible disconnections on the other end and things like that (keeping the state updated).

Is there a supported way of doing something like this? Or do I need to do a custom implementation of Select + Timeout on just one stream?

If I understand correctly, both streams return the same item type? If so, I would use select! in conjunction with tokio::time::timeout like this:

async fn two_streams<S1: Stream<Item = ()> + Unpin, S2: Stream<Item = ()> + Unpin>(
    mut s1: S1,
    mut s2: S2,
) {
    while let Some(value) = tokio::select! {
        result = tokio::time::timeout(Duration::from_secs(1), s2.next()) => {
            match result {
                Ok(stream_result) => stream_result,
                Err(_) => todo!("handle timeout"),
            }
        },
        result = s1.next() => result,
    } {
        // process the value
    }
}

Depending on how you want the timeout to flow through this code, you may want to change what select returns. If I misunderstood the original requirements, hopefully this snippet might still point you in the right direction!

1 Like

Hi, thank you for your help. The reason I didn't want to go with that solution is that I'm not confident that it couldn't cause starvation of one of the streams.

I have just found a timeout function specific for streams, so that might be enough :slight_smile:.

I hadn't seen it before because it is present in tokio-stream and not in tokio.

My pleasure. If you click through the "src" link on that function, you'll see that it actually uses the same future under the hood, so both should be equivalent. That helper method looks like it will produce cleaner-looking code than what I wrote, however!

I believe that even if they are both named Timeout, they are not the same implementation. One is a Future and the other a Stream. I believe that the stream implementations take care of starvation, but I'm not 100% sure.

The tokio::select! macro will pick the first branch to check randomly to avoid this.

1 Like

Cool! Good to know, thank you.

I know that this is very opinion based but, any particular preference between Future (next() + select! + timeout()) and Stream (select() or merge() + timeout())?

I believe both options would solve my problem.

Basically the difference is whether the timeout is reset whenever the other stream yields an item.

Both can avoid resetting the timeout, right?

With Futures you can timeout only in the appropriate branch of the select!.

With Streams you can timeout in the appropriate stream and merge them after that.

Well, the future version from above doesn't avoid it, though I'm sure you could change it such that this doesn't happen.

1 Like

You are totally right. My bad.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.