How to process the close signal before the message in a message processing loop?

fn process(_msg: u32) {}

#[tokio::main]
async fn main() {
    use tokio::{sync::{mpsc, oneshot::{self, error::TryRecvError}}, signal, time::{sleep, Duration}};
    let (msg_tx, mut msg_rx) = mpsc::unbounded_channel::<u32>();
    let (close_tx, mut close_rx) = oneshot::channel::<()>();
    tokio::spawn(async move {
        while let Some(msg) = msg_rx.recv().await {
            match close_rx.try_recv() {
                Err(TryRecvError::Empty) => {
                    process(msg);
                },
                Ok(()) => {
                    process(msg);
                    break;
                },
                Err(TryRecvError::Closed) => panic!(),
            }
        }
    });
    tokio::spawn(async move {
        loop {
            msg_tx.send(0).unwrap();
            println!("sent");
            sleep(Duration::from_millis(100)).await;
        }
    });
    signal::ctrl_c().await.unwrap();
    close_tx.send(()).unwrap();
}

This is a simplified message processing loop. I've added close signal processing to it, but the logic for doing so is very strange: after the close signal is sent, you still need to wait for the next message to be received before you can close it. I envision manually handling the close rx poll method first and then the message, but I don't know how to pin it. select! doesn't seem to fit this case (or maybe I didn't think of a way to use it).

I have further read the documentation and examples of select!, is this correct?

fn process(msg: u32) {
    println!("recv {}", msg)
}

fn close() {
    println!("close")
}

#[tokio::main]
async fn main() {
    use tokio::{sync::{mpsc, oneshot}, signal, time::{sleep, Duration}};
    let (msg_tx, mut msg_rx) = mpsc::unbounded_channel::<u32>();
    let (close_tx, mut close_rx) = oneshot::channel::<()>();
    tokio::spawn(async move {
        loop {
            tokio::select! {
                biased;
                Ok(()) = &mut close_rx => {
                    close();
                    break;
                }
                msg_result = msg_rx.recv() => {
                    if let Some(msg) = msg_result {
                        process(msg);
                    } else {
                        break;
                    }
                }
            }
        }
    });
    tokio::spawn(async move {
        loop {
            for i in 0..1000 {
                msg_tx.send(i).unwrap();
                println!("sent {}", i);
                sleep(Duration::from_millis(100)).await;
            }
        }
    });
    signal::ctrl_c().await.unwrap();
    close_tx.send(()).unwrap();
    println!("pressed exit");
}

If all msg_txs are dropped, or close_tx send a message, then exit; if a message is received, then process it; if close_tx is dropped, then panic. There should be no other cases.

You should wait for the close to finish before returning from main. See this page for an example of that.

Another thing to note is that your current code will not exit during a call to process. If waiting for process to return before exiting is what you want, then this is ok, but if it is not what you want, then you would need to change it.

It is a intended behavior to process all the requests that have been already received and then exit. But I don't understand what I have to do to "wait for the close to finish".
The None branch of matching msg_result should have a close() before break. I just forgot to add it.

What I mean is that you have to wait here:

    close_tx.send(()).unwrap();
    println!("pressed exit");
    // here!
}

Thanks for reminding. Given my previous experience with implementing a channel wrapper that can respond to messages (i.e. requests), I think it would be possible to pass another oneshot channel in the oneshot channel used to pass the close signal, and "respond" to the main thread after the close is complete.

There are various ways to do it. The link I gave previously also suggests an approach.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.