Tokio callback question

I have an async axum handler that spawns off an async process that must act as a client and respond to requests coming in from a remote server. I call this process_incoming_messages below.

The process takes the form of a simple loop that must make a series of TcpStream::read_exact(..).await calls and process them. Here's the basic skeleton.

use tokio::net::TcpStream;
async fn process_incoming_messages( in_stream: &mut TcpStream ) {
       /// initialize msg_buf
       loop {
            in_stream.read_exact(&mut msg_buf).await.unwrap();
            // process message in msg_buf

Eventually the server will stop sending messages. This means that the read_exact call will remain in the waiting state.

When this happens I wish to have another async task (perhaps an axum rest handler) be able to send in a message to abort and gracefully clean up.

I'm curious what the recommended approach to do this would be. It would be nice if I could setup a callback function that could be handled by the same task that is waiting for the message to arrive that never does.

Is this possible?

Thank you for any advise you may have.

You should first note that if the server closes the connection when it stops sending, then the call will not hang forever - it will return an error.

Anyway, to cancel the operation on some other condition, you can use a tokio::select!. Check out this page.

1 Like

Thank you for the reply.

It turns out in my case the server is not so nice. In fact it uses what would probably be called an archaic protocol in that it expects the client to open both an input stream and an output stream instead of a single bi-directional stream.

It's actually a server that is sending in a stream of events that are emitted from a set of fixed position visual sensors (cameras) monitoring a sporting event. (e.g. a beach volleyball match. ) So it could be that the ball has been hit into the crowd and the server decides there's nothing else to send because all it can see is people moving around without a ball.

What I was hoping was there was a way to actually keep the task itself alive and have "it" decide whether or not to abort itself. Perhaps by having a handler cause the read_exact to return with an error that could be consumed inside the loop itself at which point a more "task-locally" based decision could be made.

I'm guessing this is just not possible and I should be looking for a way to kill the task from outside.

Perhaps I'm misunderstanding the solution you are proposing due to the fact that instead of the word "Shutdown" I would prefer to use the world "Interrupt".

My basic question is whether it's possible to interrupt an async task but not have it shutdown? Similar to how you can a thread.

Thank you.

Well, what tokio::select! does is to interrupt the current operation. You don't have to return from the task afterwards if you don't want to.

Oh, okay. That helps a lot. For some reason that was not obvious to me. I was concerned once interrupted my only option was to shutdown.

Thank you very much!


I want to make sure I'm thinking right. So the doc shows the following example.

use tokio::time::{self, Duration};

async fn some_async_work() {
    // do work

async fn main() {
    let mut delay = time::delay_for(Duration::from_millis(50));

    loop {
        tokio::select! {
            _ = &mut delay => {
                println!("operation timed out");
            _ = some_async_work() => {
                println!("operation completed");

Following this idea, would an acceptable approach be to, inside some_async_work perform the read_exact against the input socket. Then contrasting the idea of a timeout goal (implemented here), to instead rig up some condition or message that could be invoked/sent from another task to cause the read_exact to die while the parent task lives on? (That way I could keep the same basic notion of a parent task that acts like main here and does some clean up work and then loops back to issue another read_exact.)

That seems fine.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.