Cancellation in Tonic/Tokio: How does it work?

I'm new to Rust and started to write a service with Tonic (gRPC). In my use-case I need to handle request cancellation. The tonic code already provides a pretty good example.

Now, I could just copy this code and it would work for me. But I do not understand how it is working. So, I would be glad if someone could explain this code in more detail.

My main questions are:

  • What happens when the request gets cancelled?
  • Why gets the token future dropped, when the request was cancelled?
  • Why is the cancellation_future not dropped, when the request was canceled?

see: tonic/examples/src/cancellation/server.rs at eeb3268f71ae5d1107c937392389db63d8f721fb · hyperium/tonic · GitHub

async fn with_cancellation_handler<FRequest, FCancellation>(
    request_future: FRequest,
    cancellation_future: FCancellation,
) -> Result<Response<HelloReply>, Status>
where
    FRequest: Future<Output = Result<Response<HelloReply>, Status>> + Send + 'static,
    FCancellation: Future<Output = Result<Response<HelloReply>, Status>> + Send + 'static,
{
    let token = CancellationToken::new();
    // Will call token.cancel() when the future is dropped, such as when the client cancels the request
    let _drop_guard = token.clone().drop_guard();
    let select_task = tokio::spawn(async move {
        // Can select on token cancellation on any cancellable future while handling the request,
        // allowing for custom cleanup code or monitoring
        select! {
            res = request_future => res,
            _ = token.cancelled() => cancellation_future.await,
        }
    });

    select_task.await.unwrap()
}

Thanks!

I found an answer for me. Please, do not hesitate to add more (correct) information to this topic.

I assume that the future of the request method in the example code above is dropped when the gRPC request is cancelled. With tokio::spawn() a new independent task is created. This new task does not get dropped when the request is cancelled because it is running in a separately.

So, the token future gets dropped when the request was cancelled. The cancellation_future is not dropped because it is part of the separate task.

Sources which helped me to understand the example code:

use std::{sync::{atomic::{AtomicU8, Ordering}, Arc}, time::Duration};
use tokio::{select, test, time::sleep};

#[test(flavor = "current_thread")]
async fn test_future() {
    let flag = Arc::new(AtomicU8::new(5));
    let func_flag = flag.clone();

    let func = async move {
        sleep(Duration::from_millis(10)).await;
        func_flag.fetch_add(2, Ordering::Relaxed);
    };

    select! {
        _ = func => (),
        _ = sleep(Duration::from_millis(0)) => (),
    }
    sleep(Duration::from_millis(20)).await;

    assert_eq!(flag.load(Ordering::Relaxed), 5);
}

#[test(flavor = "current_thread")]
async fn test_spawned_task() {
    let flag = Arc::new(AtomicU8::new(5));
    let func_flag = flag.clone();

    let func = tokio::spawn(async move {
        sleep(Duration::from_millis(10)).await;
        func_flag.fetch_add(2, Ordering::Relaxed);
    });

    // the spawned task will still be executed even if func is dropped here
    select! {
        _ = func => (),
        _ = sleep(Duration::from_millis(0)) => (),
    }
    sleep(Duration::from_millis(20)).await;

    assert_eq!(flag.load(Ordering::Relaxed), 7);
}

The utility you've shared looks like a tool for getting around the fact that Rust doesn't have async drop. When the gRPC request is cancelled (e.g. if the underlying connection is lost), then this just immediately stops the task from executing and runs the destructors. Those destructors can't do anything async.

However, since the tokio::spawn task is separate and survives the cancellation, it can still do async operations. It works via the drop guard that calls cancel on the token in its destructor. When the token is cancelled, the tokio::spawn task will start running cancellation_future in an async manner, and then exit once cancellation_future exits.

So that way, you essentially get to do async work on drop.

Note also that you can of course do more complicated things than just the select! that it currently uses.

Maybe you've already seen this link, but if not, then I'd like to add it to your collection: Graceful Shutdown | Tokio - An asynchronous Rust runtime

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.