Handle an async TcpStream loop and an async timeout stream in the same loop?

I have a simple AsyncRead structure that implements futures::stream::Stream which basically wraps TcpStream, reads data from it, and returns parsed HTTP/2 frames.

// fake example
let mut frames = FramedStream::new(tcpstream);
while let Some(frame) = frames.next().await {
     // call on tcp event, process frame
}

I'd like to send a PING message after some time of inactivity. Now, I guess I'll have to change my loop into something like this:

// fake example
let mut frames = FramedStream::new(tcpstream);
loop {
    if let Some(frame) = frames.next() {
        // process if new frame found
    } else if (inactive > 300) {
        tcpstream.write("PING").await
    }
}

Though I see loops like this everywhere, I feel like having a loop like this is not the way to go and I'd guess it takes too many resources. I probably have to take a different approach and use waker.wake() but I'm not sure how. The loop's "tick" should be invoked by a system event somehow.

What would be the right approach to create functionality as explained above, where you have to basically handle two streams at a time and what would be the right approach for using waker.wake() in this case?

Is something like https://docs.rs/async-std/1.6.2/async_std/io/fn.timeout.html sufficient to solve this problem for a single stream?

The case for handling two streams is influenced by their interaction, which isn't sufficiently explained for me to offer a suggestion.

Typically I would do this using the select! macro.

@acalhoon, as far as I understand futures in Rust, futures-rs is the only library that allows for writing "universal" futures. Most of the time I write open-source libraries thus I can't bond my code to any existing runtime such as async-std, tokio etc - this part of rust is super weird :). I realized I can use async-std for closed source projects only where the result can be opinionated.

@alice, I've seen select! yeah. I just can't figure out whether this function is simply looping (loop {poll, poll, waker}) and polling both futures or it uses more sophisticated system events for invocation? I care about server resources that's why I ask. Would you please explain this in more detail?

I am not really familiar with async-std, but here's how you would do it using Tokio. It should probably be rather easy to adapt, although the futures crate's select! can be more cumbersome to use due to some fusing stuff.

loop {
    tokio::select! {
        frame = frames.next() => { // next() is from StreamExt
            // handle the frame
        },
        _ = time::delay_for(duration) => {
            // handle the timeout
        },
    }
}

The select! macro works by polling each branch, and then going to sleep until any branch sends a wake-up. Once a branch sends a wake-up, it polls every branch and goes to sleep again. It keeps going like this until a branch completes, at which point every other branch is cancelled and the body of the branch is executed.

Check out Tokio's tutorial on select. Note that this chapter depends somewhat on the preceding "async in depth" chapter.

1 Like

Can I summarize that by saying :

Select! effectively does a .await on multiple .async functions.
Whichever one becomes ready first is the one the whose body "{}" block is executed.
The rest are discarded.

Of course when you put the select! in a loop then everything that becomes ready will be done eventually. One at a time.

1 Like

Yes.

The way I wrote it, a new future is created for every branch in each iteration.

Ah, ah.

Thing is, when straining my simple mind on getting my applications to work I have enough trouble keeping their logic straight. I don't want to have to think about all the mechanics of futures, polls, pins, etc working under the hood.

I need a simple model to work to in mind.

Whilst we are here, is there any way to do a select! like operation on normal thread handles?

For example, I might want two threads running forever, if either of them fails that is an error. Can't do it with .join().

I was just thinking of recasting a threaded program to use tokio, just so I could do that. There are other tokio attractions as well of course.

When using tokio::spawn or spawn_blocking, you can use it directly because tokio::task::JoinHandle is a future. If it is a real thread, you can't await it, but you would use an oneshot channel instead.

Note that to not consume a future in a select!, you can use &mut the_future instead of the_future in the select!. This is also explained in Tokio's tutorial on select.

Yes, that is what I had in mind to use tokio::spawn and/or spawn_blocking so that I could then use select! on them.

I have seen examples of people doing this with real threads and channels. That always seemed like a kludge to me.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.