Gracefully shutting down in Tokio: Using channel TX in task

I think this might be a better medium for this question than the Discord channel.... apologies for the cross post.

While some useful direction came from Discord group, the question in case 3. below is still open, and I'd like to understand how the borrow checker can accept a function that used all three of the channel Sender functions - if there is a limit on what combinations of the three can be used in one function the docs don't yet set out those limits.

The tokio::aync::oneshot::channel sender (say tx) has 'interesting' functions when you're using it to shutdown a task by passing it to a handler function (as part of launching that task) - wrapped in an Option, say some_tx. The handler signature looks like this:

    // This is the simplest signal handling in tokio.
    // We don't pass anything between the thread running `main` and the thread
    // `tokio::spawn`ed to run this function.
    pub async fn handle_signal(some_tx: Option<tokio::sync::oneshot::Sender<String>>) {
        //let tx = some_tx;
        tokio::signal::ctrl_c().await.expect("Handle ctl-c (Tokio built in)");
        println!("Trapped signal clt-c to quit.");
        //let Some(tx) = some_tx.take();
        if let Some(tx) = some_tx {
            match tx.send("Gracefully".to_string()){
                Ok(()) => println!("The message may be received."),
                Err(e) => println!("The message will never be received: {:?}",e),
            }
        }
        if let Some(tx) = some_tx {
            // Wait for the associated `rx` handle to be `rx.close()` or`drop(rx)`ed
            tx.closed().await;
            println!("The receiver (rx) is closed or dropped.");
        }
        println!("Exiting!");
        std::process::exit(1);
    }
}

Within that handler function a reasonable expectation might be to use three of the Sender functions: send(), closed() and is_closed()

All three of those have different signatures.

https://docs.rs/tokio/1.9.0/tokio/sync/oneshot/struct.Sender.html

Meaning you have invoke/call each of them by accessing the tx inside the Option in different ways:

  1. for tx.send(): use if let Some(tx) = some_tx { tx.send() } or some_tx.map(|tx| tx.send())
  2. for tx.is_closed(): use some_tx.as_ref().map(|tx| tx.is_closed()
  3. for tx.closed().await: use ... this is where I'm stuck.

To clarify why this is in the beginners thread... I wanted to write a hello world type of CLI app that respected user signals (say ctl-c) and gracefully shutdown.

Turns out understanding the detail of the graceful shutdown part is actually quite hard.

The borrow checker doesn't like the current version:

   |
23 |         if let Some(tx) = some_tx {
   |                     -- value moved here
...
29 |         if let Some(tx) = some_tx {
   |                     ^^ value used here after move
   |
   = note: move occurs because value has type `tokio::sync::oneshot::Sender<String>`, which does not implement the `Copy` trait
help: borrow this field in the pattern to avoid moving `some_tx.0`
   |
23 |         if let Some(ref tx) = some_tx {
   |                     ^^^

error[E0596]: cannot borrow `tx` as mutable, as it is not declared as mutable
  --> regatta/examples/05-pages-stream-4-ok-sig-c.rs:31:13
   |
29 |         if let Some(tx) = some_tx {
   |                     -- help: consider changing this to be mutable: `mut tx`
30 |             // Wait for the associated `rx` handle to be `rx.close()` or`drop(rx)`ed
31 |             tx.closed().await;
   |             ^^ cannot borrow as mutable

Some errors have detailed explanations: E0382, E0596.
For more information about an error, try `rustc --explain E0382`.

You need to do this:

if let Some(tx) = &mut some_tx {
    ...
}

or this:

if let Some(tx) = some_tx.as_mut() {
    ...
}

The first works because matching on references to stuff is like matching on the thing inside, except the fields become references. The second works because as_mut turns &mut Option<T> into Option<&mut T>.

Thanks @alice , trying the second suggestion first - only because it is more helpful for a reader to look up documentation for as_mut():

The borrow checker is still unhappy (reading up on the error sugegstions):

   |
23 |         if let Some(tx) = some_tx {
   |                     -- value partially moved here
...
29 |         if let Some(tx) = some_tx.as_mut() {
   |                           ^^^^^^^ value borrowed here after partial move
   |
   = note: partial move occurs because value has type `tokio::sync::oneshot::Sender<String>`, which does not implement the `Copy` trait
help: borrow this field in the pattern to avoid moving `some_tx.0`
   |
23 |         if let Some(ref tx) = some_tx {
   |                     ^^^

Some errors have detailed explanations: E0382, E0596.
For more information about an error, try `rustc --explain E0382`.

Ah, I didn't notice that it was an oneshot channel. You will need to call .take() instead of .as_mut() then. This replaces the variable with None.

Interesting cargo check tells a very different story compared to (I assume) rust-analyzer in the VS Code IDE...

Anyway, following the compiler feedback, I make three changes:

  1. Add ref to:
if let Some(tx) = some_tx {
  1. Change the function signature to be mut some_tx.

With those changes handle_signal becomes:

pub async fn handle_signal(mut some_tx: Option<tokio::sync::oneshot::Sender<String>>) {
        //let tx = some_tx;
        tokio::signal::ctrl_c().await.expect("Handle ctl-c (Tokio built in)");
        println!("Trapped signal clt-c to quit.");
        //let Some(tx) = some_tx.take();
        if let Some(ref tx) = some_tx {
            match tx.send("Gracefully".to_string()){
                Ok(()) => println!("The message may be received."),
                Err(e) => println!("The message will never be received: {:?}",e),
            }
        }
        if let Some(tx) = some_tx.as_mut() {
            // Wait for the associated `rx` handle to be `rx.close()` or`drop(rx)`ed
            tx.closed().await;
            println!("The receiver (rx) is closed or dropped.");
        }
        println!("Exiting!");
        std::process::exit(1);
    }

Against which there is now only one complaint (good news I think):

error[E0507]: cannot move out of `*tx` which is behind a shared reference
  --> regatta/examples/05-pages-stream-4-ok-sig-c.rs:24:19
   |
24 |             match tx.send("Gracefully".to_string()){
   |                   ^^ move occurs because `*tx` has type `tokio::sync::oneshot::Sender<String>`, which does not implement the `Copy` trait

For more information about this error, try `rustc --explain E0507`.

The solution is not going to involve adding ref to the Some(tx) things. You have to give up ownership to send on an oneshot channel.

In general, I don't think it will be possible for you to handle both directions of communication with the same channel. Check out this chapter if you haven't seen it already.

using take() in place of as_mut() returns us to the error message shown here:

Apologies, I'm not sure I understand. By passing the TX part of the channel to the signal handler aren't I only communicating in one direction - back to the RX half of the channel in the thread running main?

Thanks I had seen that and also looked at the mini-redis and tiny approaches.
I was hoping to get something much simpler by using the sync oneshot channel - but that appears to be out of reach?

The .take() call needs to go on the first if, not the second if.

You are both sending a shutdown signal and waiting for shutdown to complete. That's two directions.

You could probably do it with two channels, one for each direction.

Also, using std::process::exit(1) is a poor idea if you care about running destructors because it does not run destructors of any variables alive when you call it.

Many thanks for your patient help. I'll turn to the std::process::exit(1) once I'm out of this channel dance.

For when you get to it: The easiest way to exit with a status code that avoids this issue is to have a main method like this:

fn main() {
    std::process::exit(real_main());
}

fn real_main() -> i32 {
    ...
}

Note that you can put the #[tokio::main] annotation on real_main rather than main.

1 Like

Thanks again @Alice. I think I better understand your point now, and have adopted your suggestion.

IIUC, I remove the std::process::exit(1);, and in doing so the thread dies the first time the user sends ctl-c.
Am I right that this means:

  1. I should track the 'shutdown status' much the same way as the mini-redis example does with the Shutdown type.
  2. In each function I need to pass in the Shutdown struct and add some logic to guard against proceeding after a signal is sent (all fn are async).

The last part feels wrong and I wonder if I'm missing some trait/type insight?
Or, more likely, I haven't fully internalized the mini-redis details, and the solution lies in there?.

You need some selects in the right place to have things exit, yes.

1 Like

Hmm, so this relates to the observation/claim what while rust prevents memory related errors and enforces safe/correct memory usage, it does not guarantee your app will not/cannot leak memory.

Correct?

I'm not sure I see the connection. Rust generally makes it very hard to leak memory unintentionally, even if it is possible to do so without an unsafe block.

The connection, in my mind, was that by having std::process::exit(1); in the fn handle_signal it was possible to leak memory and the borrow checker wouldn't necessarily complain. Further, it is still possible for me to add

or to do that badly and leak memory - but that is on me. Rust does not offer any assurances on that front when a piece of code compiles without error or warning.

However, you can structure (per your kind suggestion) things so that memory doesn't leak - the code still compiles and runs but now ctl-c won't (necessarily) exit the whole app without some additional logic.

As you say for some things Rust has guarantees (e.g. some of the work the borrow checker does), for other things (e.g. memory leaks) Rust makes difficult, and if you adopt some sensible structure/practices (e.g. your suggestion), Rust make them extremely difficult - but not guaranteed.

When the program exits, all memory it claimed is freed anyway. Leaking is a problem if the program continues to work, especially if leaked memory accumulates over time.

1 Like

Apologies. I misunderstood @alice 's comment:

For no reason other than my ignorance I intuited that to mean 'could leak memory'. My bad. Thanks for clarifying. I now understand @alice to be making a more subtle point around destructors - which have jump a few places in my list of important things to learn about Rust :slight_smile:

For some programs that don't have anything they need to do on shutdown (e.g. write something to a file, close connections gracefully), calling std::process::exit is ok.

1 Like