3 ways to stop task with CancellationToken

Assume that I have a task that does some work in an infinite loop (listening to connections or taking data from receiver). Now I want to be able to stop it gracefully by some cancellation signal. It might be done in at least 3 ways:

  1. Use token inside the loop
  2. Use token outside the loop
  3. Use token.run_until_cancelled (I think it is equivalent to 2)

To me they look more or less equivalent, but I feel that there are some pitfalls I could have missed. The question is when to use on of these ways and what are the implications of each one on behavior, safety, and performance.

async fn func_select_inside(mut receiver: mpsc::Receiver<String>, cancel: CancellationToken) {
    println!("select! inside the loop:");
    let mut num_handled = 0;
    loop {
        select! {
            msg = receiver.recv() => {
                match msg {
                    Some(_msg) => {num_handled += 1;}
                    None => break,
                }
            }
            _ = cancel.cancelled() => {
                println!("[inside] cancelled");
                break;
            }
        }
    }
    println!("Num handled {num_handled}");
}

async fn func_select_outside(mut receiver: mpsc::Receiver<String>, cancel: CancellationToken) {
    println!("select! outside the loop:");

    let num_handled = Arc::new(AtomicU64::new(0));
    let main_loop = {
        let num_handled = num_handled.clone();
        async move {
            loop {
                match receiver.recv().await {
                    Some(_msg) => {
                        num_handled.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
                    }
                    None => break,
                }
            }
        }
    };

    tokio::select! {
        _ = main_loop => {},
        _ = cancel.cancelled() => {
            println!("[outside] cancelled");
        },
    }
    let num_handled = num_handled.load(std::sync::atomic::Ordering::SeqCst);
    println!("Num handled {num_handled}");
}

async fn func_run_until(mut receiver: mpsc::Receiver<String>, cancel: CancellationToken) {
    println!("run_until:");
    let num_handled = Arc::new(AtomicU64::new(0));

    let main_loop = {
        let num_handled = num_handled.clone();
        async move {
            loop {
                match receiver.recv().await {
                    Some(_msg) => {
                        num_handled.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
                    }
                    None => break,
                }
            }
        }
    };

    cancel.run_until_cancelled(main_loop).await;
    let num_handled = num_handled.load(std::sync::atomic::Ordering::SeqCst);
    println!("Num handled {num_handled}");
}

Full code can be found here Rust Playground

This design pattern is needed only if you need to perform async work after the cancellation inside the same async task.

Otherwise it's an unnecessary complication, because every Future can always be immediately cancelled by dropping it. Futures don't need to cooperate to do this, and every single one of them is forced to support immediate cancellation via Drop. Dropping always properly drops the entire call tree of futures. In other languages tasks run on their own and promises only return results, but in Rust futures have to be polled to make progress, and the executor can just stop polling them.

The .await syntax sugar doesn't give you an opportunity to drop a future, but there are wrappers like timeout that do. There's abort() method on JoinHandle you get from spawn().

1 Like