Tokio::select - what if the `handler` block has a time-consuming `await`

I have this simple example. My concern is if the write1.write_all(msg).await?; takes too long to complete (ex: 2 seconds to finish write_all), does it mean that it also takes 2 seconds for the next loop? I worry that if stream2 yields some msg just a few milliseconds after stream1 and it needs to wait 2 seconds to be handled.

Thanks in advance.

loop {
    tokio::select! {
        Some(msg) = stream1.next() => {
            write1.write_all(msg).await?;
        },
        Some(msg) = stream2.next() => {
            write2.write_all(msg).await?;
        },
    };
}

The select!() macro will wait for the first future to finish and cancel the rest.

That means if your fast stream yields new messages they'll be received straight away, and then the next time through the loop if the slow stream gets a message that's when you'll handle it. It depends on the stream's implementation, but I'd assume it just yields an item when the next one becomes available instead of doing something funky like cancelling the request (as in the underlying API call, not the future being polled) and making you start again.

@Michael-F-Bryan My concern is the slow stream needs to wait too long to be handled, and I don't know what I should do to avoid that.

The detailed scenario is as following:

  • Second 0: stream1 yields a message and the loop picks it to handle. Assume that the write1.write_all(msg).await? in stream1's handler takes 2 seconds.
  • Second 0.1: stream2 yields a message but the loop is still busy with stream1's write_all so the next loop cannot start yet.
  • Second 2: the stream1's write_all is done. The next loop occurs and it picks the stream2 to handle

--> Stream2 needs to wait 1.9s to be handled and I would love it to be handled sooner.

Yes, only one write_all runs.

This is a sloppy example where two run.
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=7bd1b0da1688040f99d00e759e03ae07

@jonh So what should I do if I want stream2 to be handled sooner instead of waiting 1.9s? Maybe I need to run it in another thread? Or is there any simpler alternative?

As my example shows both run together, hence 6s instead of 12s.

This is alternative using then
https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=1938c1ac234bf39f485abce577d23864

1 Like

Consider this?

let ((), ()) = tokio::try_join!(
    async {
        while let Some(msg) = stream1.next() {
            write1.write_all(msg).await?;
        }
    },
    async {
        while let Some(msg) = stream2.next() {
            write2.write_all(msg).await?;
        }
    },
)?;

The let ((), ()) part is just to verify at compile time that it returns what I expect it to.

3 Likes

That's the kind of approach – adding forced compile-time checking without runtime cost – that should go in a discussion of good programming practices for Rust. :clap: :clap:

2 Likes

@alice Thank you. I think it works for me if 2 tasks can be run independently. However I have 1 more related question: what if those 2 async tasks need to access the same mutable write object (write1 and write2 are one instance).

For example, I want to keep reading inputs from 2 different streams, process them and finally write output to only out write object. Do I need to add mutex or something in this case? Thanks.

let ((), ()) = tokio::try_join!(
    async {
        while let Some(msg) = stream1.next() {
            // Do some extra processing asap here
            // Then write the result to WRITE
            WRITE.write_all(msg).await?;
        }
    },
    async {
        while let Some(msg) = stream2.next() {
            // Do some extra processing asap here
            // Then write the result to WRITE
            WRITE.write_all(msg).await?;
        }
    },
)?;

It doesn't make any sense to write to something concurrently, so you would just use the select! loop from your original post (or something equivalent using StreamExt::merge).

You can add processing using the stream combinators, or the async-stream crate.

To keep polling the streams while also writing, you can merge the streams and use stream_reader together with tokio::io::copy.

1 Like

If there’s concern that the writes may take too long and you want to keep processing the incoming streams, you could use a tokio mpsc channel to effectively act as a write buffer.

2 Likes

@alice @drewkett Thanks for your explanation. I will try using the original tokio::select and push the data to mpsc channel (should be fast), then spawn another task to read from that channel and write to the write

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.