I just wrote the hardest code in my life, any improvements?

This has two usage:

  • one can pass a Fn() -> Future to job_cancelable, if Future is ready, job is cancelled.
  • one can pass something like Vec<T> and FnMut(&mut T, F: Fn() -> Future) to fire, and it can concurrently run jobs, and if SIGINT recv, all jobs quit gracefully.
use futures::stream::StreamExt;
use futures::Future;
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};

#[tokio::main]
async fn main() {
    let mut ss = vec!["1".to_string(), "2".to_string(), "3".to_string()];
    job_cancelable(&mut "hi".to_string(), tokio::signal::ctrl_c).await;
    fire(&mut ss, job_cancelable).await;
}

async fn job_cancelable<F, Fut, Any>(s: &mut String, f: F)
where
    F: Fn() -> Fut,
    Fut: Future<Output = Any> + Send,
    Any: Send,
{
    println!("start {}", s);
    tokio::select! {
        _ = tokio::time::sleep(tokio::time::Duration::from_secs(4)) => {
            println!("end {}", s);
        }
        _ = f() => {
            println!("cancelled {}", s);
        }
    }
}

async fn fire<'a, F, T>(ss: &'a mut [String], mut f: F)
where
    F: FnMut(&'a mut String, Box<dyn Fn() -> WaitForCancellationFutureOwned>) -> T,
    T: Future<Output = ()>,
{
    let token = CancellationToken::new();
    let mut stream = tokio_stream::iter(ss.iter_mut())
        .map(|s| {
            let token = token.clone();
            let fut = Box::new(move || token.clone().cancelled_owned());
            f(s, fut)
        })
        .buffer_unordered(2);
    loop {
        tokio::select! {
            res = stream.next() => {
                match res {
                    Some(_) => {}
                    None => break,
                }
            },
            _ = tokio::signal::ctrl_c() => {
                token.cancel();
            }
        }
    }
}

Is here anything to keep on improving?

This code is very abstract and separated from its purpose enough that it's hard to say what might be significantly improved to do the job in a simpler way — I can't tell why it is the way it is. But here is some plain “local analysis only” code review:

  • If job_cancelable's entire purpose is to be called by fire, then why does it take an async function instead of a CancellationToken value? If it has other purposes, clarification is needed via names and comments.

  • f would be much better named cancelled or something. Right now, one glancing at the signature of job_cancelable might expect that f is a function that makes up part of the job, not the cancellation mechanism.

  • job_cancelable calls f only once, but has a F: Fn() bound. It should be F: FnOnce() to be less restrictive; once you do that, you can get rid of the clone() in the provided closure in fire().

  • In general, use more meaningful variable names, that communicate what role they play.

  • An actual bug: in fire, you are calling ctrl_c() repeatedly inside the loop, and so your will ignore a signal that happens to arrive when the loop is not waiting inside stream.next(). This can be fixed and the code simplified by putting the select! outside the looping:

    tokio::select! {
        () = stream.collect() => {},
        _ = tokio::signal::ctrl_c() => {
            token.cancel();
        }
    }
    

    (Here the looping is being done by collect().) In general, if you select! inside a loop, you need to make sure that either the expressions inside the select! don't create new futures, or those futures are cancellation safe (which next() is, so that part was OK). If you wanted to keep the loop, you would move the ctrl_c() outside it:

    let ctrlc = tokio::signal::ctrl_c();
    tokio::pin!(ctrlc); // needed for repeated use
    loop {
        tokio::select! {
            res = stream.next() => {
                match res {
                    Some(_) => {}
                    None => break,
                }
            },
            _ = &mut ctrlc => {
                token.cancel();
            }
        }
    }
    
4 Likes

Thanks a loooooooooot :pray:

Now it's better.

use futures::stream::StreamExt;
use futures::Future;
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};

#[tokio::main]
async fn main() {
    let mut ss = vec!["1".to_string(), "2".to_string(), "3".to_string()];
    job_cancelable(&mut "hi".to_string(), tokio::signal::ctrl_c).await;
    fire(&mut ss, job_cancelable).await;
}

/// This can be as a method in a trait, users need to implement the trait
/// so that the job can be cancelled and cleanup gracefully.
///
/// Then some `Ext` traits can be auto implemented base on it like `fire` does,
/// so user can use `Ext::batch_job_cancelable` to batch run jobs.
///
/// `cancelled() -> Future`, if Future is resolved, the job is cancelled.
/// So user can pass `tokio::signal::ctrl_c` or `tokio::sleep`
/// or `CancellationToken::cancelled()` to cancel the job.
async fn job_cancelable<F, Fut, Any>(s: &mut String, cancelled: F)
where
    F: FnOnce() -> Fut,
    Fut: Future<Output = Any> + Send,
    Any: Send,
{
    println!("start {}", s);

    tokio::select! {
        _ = async {
            loop {
                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                println!("recv chunk {}", s);
            }
        } => {}
        _ = cancelled() => {
            println!("cancelled {}", s);
        }
    }
}

async fn fire<'a, F, T>(ss: &'a mut [String], mut f: F)
where
    F: FnMut(&'a mut String, Box<dyn FnOnce() -> WaitForCancellationFutureOwned>) -> T,
    T: Future<Output = ()>,
{
    let token = CancellationToken::new();
    let stream = tokio_stream::iter(ss.iter_mut())
        .map(|s| {
            let token = token.clone();
            let fut = Box::new(|| token.cancelled_owned());
            f(s, fut)
        })
        .buffer_unordered(2);

    tokio::select! {
        _ = stream.collect() => {},
        _ = tokio::signal::ctrl_c() => {
            token.cancel();
        }
    }
}

I cannot understand this. Could you please give me a concreter description? :pray:


I'll back to tokio blog then. :face_holding_back_tears:

Remember that all operations take some time (except those optimized to zero code). Visualize this by imagining a version of your loop where there are explicit operations that take significant time:

loop {
    sleep(Duration::from_secs(1)).await;
    select! {
        _ = sleep(Duration::from_secs(2)) => {
            sleep(Duration::from_secs(3)).await;
        },
        _ = tokio::signal::ctrl_c() => {
            break;
        }
    }
}

Calling tokio::signal::ctrl_c() starts listening for the signal and returns a future, and when that future is dropped, it stops listening for the signal. When one of the arms of a select!() completes, the futures in all the other arms are dropped.

Therefore, the only time this program will respond to ^C is when it is performing the 2-second sleep. At all other times, there is no ctrl_c listener active.

Your original code would mostly work, because it spends most of its time awaiting stream.next(). But it is good practice to write code that works 100% of the time — both because occasional silent failures drive users nuts, and because if you string together 10 things that work 99% of the time, you have something that only works 90% of the time — you have to get many small things correct in order to write large programs that are usable at all.

3 Likes

Thanks again!

I tested, and it's real. And after reading select!'s doc, I finally understand.

Edit: And the F: FnOnce is not necessary.

use futures::stream::StreamExt;
use futures::Future;
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};

#[tokio::main]
async fn main() {
    let mut ss = vec!["1".to_string(), "2".to_string(), "3".to_string()];
    job_cancelable(&mut "hi".to_string(), tokio::signal::ctrl_c()).await;
    fire(&mut ss, job_cancelable).await;
}

/// This can be as a method in a trait, users need to implement the trait
/// so that the job can be cancelled and cleanup gracefully.
///
/// Then some `Ext` traits can be auto implemented base on it like `fire` does,
/// so user can use `Ext::batch_job_cancelable` to batch run jobs.
///
/// `cancelled: Future<...>`, if Future is resolved, the job is cancelled.
/// So user can pass `tokio::signal::ctrl_c()` or `tokio::sleep()`
/// or `CancellationToken::cancelled()` to cancel the job.
async fn job_cancelable<Fut, Any>(s: &mut String, cancelled: Fut)
where
    Fut: Future<Output = Any> + Send,
    Any: Send,
{
    println!("start {}", s);

    tokio::select! {
        _ = async {
            loop {
                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                println!("recv chunk {}", s);
            }
        } => {}
        _ = cancelled => {
            println!("cancelled {}", s);
        }
    }
}

async fn fire<'a, F, T>(ss: &'a mut [String], mut f: F)
where
    F: FnMut(&'a mut String, WaitForCancellationFutureOwned) -> T,
    T: Future<Output = ()>,
{
    let token = CancellationToken::new();
    let stream = tokio_stream::iter(ss.iter_mut())
        .map(|s| {
            let token = token.clone();
            let fut = token.cancelled_owned();
            f(s, fut)
        })
        .buffer_unordered(2);

    tokio::select! {
        _ = stream.collect() => {},
        _ = tokio::signal::ctrl_c() => {
            token.cancel();
        }
    }
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.