Scheduled async task

I want to have an async task ( in tokio ) that wakes up at a specified time and does something.

But also, I want to be able to change the wake-up time (bring it forward), at any point.

The async task doesn't have any associated data, so it would be ok to kill/drop/destroy the existing task somehow and start another one when the wake-up time changes.

Is this possible, what is the best way to go about this?

( I wondered if there was a channel receive function that took a timeout value, which I could use, but I didn't find anything )

It seems like you can have a timer task.

1 Like

I did notice here it says: sleep in tokio::time - Rust

"Canceling a sleep instance is done by dropping the returned future. No additional cleanup work is required."

So.. maybe I can use that? Is that relevant?

I am now wondering if I need select!

That way the async task can be sleeping but also listening on a channel, I think?

What I came up with:

// task for sleeping - calls timed.Run once sleep time has elapsed.
async fn sleep_loop(mut rx: mpsc::Receiver<u64>, state: Arc<SharedState>) {
    let mut sleep_ms = 5000;
    loop {
        let sleep = tokio::time::sleep(core::time::Duration::from_millis(sleep_ms));
        tokio::pin!(sleep);

        tokio::select! {
            ms = rx.recv() => { sleep_ms = ms.unwrap(); }
            _ = &mut sleep =>
            {
              let mut st = ServerTrans::new();
              st.x.qy.sql = Arc::new("EXEC timed.Run()".to_string());
              state.process(st).await;
            }
        }
    }
}

Whole code here: RustDB/axumtest.rs at main · georgebarwood/RustDB · GitHub

You don't need the tokio::pin! + &mut sleep dance if you're not reusing the timer.

async fn sleep_loop(mut rx: mpsc::Receiver<u64>, state: Arc<SharedState>) {
    let mut sleep_ms = 5000;
    loop {
        tokio::select! {
            ms = rx.recv() => { sleep_ms = ms.unwrap(); }
            _ = tokio::time::sleep(core::time::Duration::from_millis(sleep_ms)) =>
            {
              let mut st = ServerTrans::new();
              st.x.qy.sql = Arc::new("EXEC timed.Run()".to_string());
              state.process(st).await;
            }
        }
    }
}
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.