Understanding futures: Make the `async_io::Timer` resettable for `async-std`

We have a SleepProvider containing a sleep() method which just calls async_io::Timer::after(duration) for async-std. In this MR I am trying to implement a new SleepFuture trait with a reset() method allowing us to change the duration of the sleep after it was already constructed.

Because my experience with futures in Rust is limited I wasn't sure about how to do this and made a too complex implementation for async-std.

I did some studying trying to understand futures better and based on this video and inspired by this example I got from reading the Async Rust Book I came up with a new way.

I'd like to know if:

  • The new implementation seems correct.
  • My understanding is on point based on the comments in my code.
  • If it still can be improved.

Thanks in advance!

// Wrapper around `async_io::Timer` to make it resettable.
pub struct ResettableTimer {
    // Actual future that completes after a duration.
    timer: async_io::Timer,

    // The waker for the task that `ResettableTimer` is running on.
    // We can use this after setting `completed = true` to tell
    // `ResettableTimer`'s task to wake up, see that `completed = true`, and
    // move forward.
    waker: Option<std::task::Waker>,

    // Whether or not the timer has elapsed.
    completed: bool,
}

impl ResettableTimer {
    // Create a new `ResettableTimer` that completes after `duration`.
    pub fn new(duration: std::time::Duration) -> Self {
        Self {
            timer: async_io::Timer::after(duration),
            waker: None,
            completed: false,
        }
    }

    // Reset the timer to complete after `duration` from now.
    pub fn reset(&mut self, duration: std::time::Duration) {
        self.timer = async_io::Timer::after(duration);
        self.completed = false;

        // Wake up the task that `ResettableTimer` is running on, if there is one.
        if let Some(waker) = &self.waker {
            waker.wake_by_ref();
        }
    }
}

impl Future for ResettableTimer {
    type Output = ();

    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
        // Check if timer is already completed.
        if self.completed {
            return std::task::Poll::Ready(());
        }

        // Poll the underlying future (the async_io timer).
        match std::pin::Pin::new(&mut self.timer).poll(cx) {
            std::task::Poll::Ready(_) => {
                // Mark as ready so other tasks polling `ResettableTimer` can see the completion status.
                self.completed = true;
                std::task::Poll::Ready(())
            }
            std::task::Poll::Pending => {
                // Set waker so that we can wake up the current task
                // when the timer has completed, ensuring that the future is polled
                // again and sees that `completed = true`.
                //
                // We use `will_wake` to avoid cloning the waker unnecessarily.
                if self.waker.as_ref().map_or(true, |old| !old.will_wake(cx.waker())) {
                    self.waker = Some(cx.waker().clone());
                }

                std::task::Poll::Pending
            },
        }
    }
}

// Simple example.
#[async_std::main]
async fn main() {
    let mut timer = ResettableTimer::new(std::time::Duration::from_secs(15));
    timer.reset(std::time::Duration::from_secs(2));
    println!("Timer started...");
    timer.await;
    println!("Timer completed!");
}

/* More complex example demonstrating that multiple tasks can await
 * the same `ResettableTimer` instance, and that resetting the timer
 * affects all tasks. 
 * Important because of: https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/3287#note_3267354
 *
 * Due to the Mutex lock task 2 awaits the timer after task 1, but
 * both will complete after 5 seconds because of the reset and the
 * completion status being shared.
 *
#[async_std::main]
async fn main() {
    let timer = ResettableTimer::new(std::time::Duration::from_secs(25));
    let pinned = std::sync::Arc::new(async_std::sync::Mutex::new(Box::pin(timer)));

    let p1 = pinned.clone();
    let f1 = async move {
        println!("Task 1 awaiting sleep...");
        let mut fut = p1.lock().await;
        (&mut *fut).await;
        println!("Task 1 woke up!");
    };

    let p2 = pinned.clone();
    let f2 = async move {
        println!("Task 2 awaiting sleep...");
        let mut fut = p2.lock().await;
        (&mut *fut).await;
        println!("Task 2 woke up!");
    };

    {
        let mut fut = pinned.lock().await;
        std::pin::Pin::new(&mut **fut).reset(std::time::Duration::from_secs(5));
    }

    futures::join!(f1, f2);
}
 */
[package]
name = "resettable-timer-demo"
version = "0.1.0"
edition = "2024"

[dependencies]
async-io = "2.6.0"
async-std = { version = "1.13.2", features = ["attributes"] }
futures = "0.3.31"

Your code has two critical undefined behavior bugs. Nick Mathewson's suggestion for using pin_project here was a really important tip: Draft: sleepfuture-reset: Create new `SleepFuture` trait with `reset` method (!3287) ยท Merge requests ยท The Tor Project / Core / Arti ยท GitLab

Take a look at what that implementation could look like:

// Cargo.toml dependencies needed:
// [dependencies]
// async-io = "2.6.0"
// pin-project = "1.1"
//
// [dev-dependencies]
// async-std = { version = "1.13", features = ["attributes"] }
// futures = "0.3"

use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};

/// A resettable timer that wraps async_io::Timer with the ability to
/// change the deadline without creating a new future.
#[pin_project]
pub struct ResettableTimer {
    #[pin]
    timer: async_io::Timer,
    waker: Option<Waker>,
    completed: bool,
}

impl ResettableTimer {
    /// Create a new timer that completes at the given deadline.
    pub fn new(deadline: Instant) -> Self {
        Self {
            timer: async_io::Timer::at(deadline),
            waker: None,
            completed: false,
        }
    }

    /// Create a new timer that completes after the given duration from now.
    pub fn after(duration: Duration) -> Self {
        Self::new(Instant::now() + duration)
    }

    /// Reset the timer to a new deadline.
    ///
    /// This changes when the timer will complete. If a task is currently
    /// waiting on this timer, it will be woken to re-poll with the new deadline.
    pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
        let mut this = self.project();

        // Replace the timer with a new one at the new deadline
        this.timer.set(async_io::Timer::at(deadline));
        *this.completed = false;

        // Wake any waiting task so it re-polls with the new timer
        if let Some(waker) = this.waker.take() {
            waker.wake();
        }
    }

    /// Check if the timer has completed.
    pub fn is_completed(&self) -> bool {
        self.completed
    }
}

impl Future for ResettableTimer {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();

        if *this.completed {
            return Poll::Ready(());
        }

        match this.timer.poll(cx) {
            Poll::Ready(_) => {
                *this.completed = true;
                Poll::Ready(())
            }
            Poll::Pending => {
                // Update the waker if it has changed
                if this.waker.as_ref().is_none_or(|w| !w.will_wake(cx.waker())) {
                    *this.waker = Some(cx.waker().clone());
                }
                Poll::Pending
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;
    use std::sync::atomic::{AtomicBool, Ordering};

    #[async_std::test]
    async fn test_basic_timer() {
        let start = Instant::now();
        let timer = ResettableTimer::after(Duration::from_millis(50));
        timer.await;
        let elapsed = start.elapsed();

        assert!(elapsed >= Duration::from_millis(50));
        assert!(elapsed < Duration::from_millis(100));
    }

    #[async_std::test]
    async fn test_reset_extends_time() {
        let start = Instant::now();
        let timer = ResettableTimer::after(Duration::from_millis(100));
        let mut pinned = std::pin::pin!(timer);

        // Wait a bit, then reset to extend the deadline
        async_std::task::sleep(Duration::from_millis(30)).await;
        pinned
            .as_mut()
            .reset(Instant::now() + Duration::from_millis(50));

        pinned.await;
        let elapsed = start.elapsed();

        // Should take at least 80ms total (30ms + 50ms)
        assert!(elapsed >= Duration::from_millis(80));
    }

    #[async_std::test]
    async fn test_reset_shortens_time() {
        let start = Instant::now();
        let timer = ResettableTimer::after(Duration::from_millis(200));
        let mut pinned = std::pin::pin!(timer);

        // Immediately reset to a shorter time
        pinned
            .as_mut()
            .reset(Instant::now() + Duration::from_millis(50));

        pinned.await;
        let elapsed = start.elapsed();

        // Should complete in about 50ms, not 200ms
        assert!(elapsed < Duration::from_millis(100));
    }

    #[async_std::test]
    async fn test_completion_status() {
        let timer = ResettableTimer::after(Duration::from_millis(50));
        let mut pinned = std::pin::pin!(timer);

        assert!(!pinned.is_completed());

        pinned.as_mut().await;

        assert!(pinned.is_completed());
    }

    #[async_std::test]
    async fn test_multiple_resets() {
        let timer = ResettableTimer::after(Duration::from_millis(100));
        let mut pinned = std::pin::pin!(timer);

        // Reset multiple times before completion
        for _ in 0..5 {
            async_std::task::sleep(Duration::from_millis(10)).await;
            pinned
                .as_mut()
                .reset(Instant::now() + Duration::from_millis(100));
        }

        let start = Instant::now();
        pinned.await;
        let elapsed = start.elapsed();

        // Final wait should be about 100ms
        assert!(elapsed >= Duration::from_millis(100));
        assert!(elapsed < Duration::from_millis(150));
    }

    #[async_std::test]
    async fn test_shared_timer_multiple_tasks() {
        use async_std::sync::Mutex;

        let timer = ResettableTimer::after(Duration::from_millis(100));
        let shared = Arc::new(Mutex::new(timer));

        let completed1 = Arc::new(AtomicBool::new(false));
        let completed2 = Arc::new(AtomicBool::new(false));

        let shared1 = shared.clone();
        let completed1_clone = completed1.clone();
        let task1 = async_std::task::spawn(async move {
            let mut timer = shared1.lock().await;
            let pinned = std::pin::pin!(&mut *timer);
            pinned.await;
            completed1_clone.store(true, Ordering::SeqCst);
        });

        let shared2 = shared.clone();
        let completed2_clone = completed2.clone();
        let task2 = async_std::task::spawn(async move {
            // Wait a bit to ensure task1 starts first
            async_std::task::sleep(Duration::from_millis(10)).await;
            let mut timer = shared2.lock().await;
            let pinned = std::pin::pin!(&mut *timer);
            pinned.await;
            completed2_clone.store(true, Ordering::SeqCst);
        });

        // Reset the shared timer to a shorter duration
        async_std::task::sleep(Duration::from_millis(20)).await;
        {
            let mut timer = shared.lock().await;
            Pin::new(&mut *timer).reset(Instant::now() + Duration::from_millis(30));
        }

        task1.await;
        task2.await;

        assert!(completed1.load(Ordering::SeqCst));
        assert!(completed2.load(Ordering::SeqCst));
    }

    #[async_std::test]
    async fn test_reset_to_past_completes_immediately() {
        let timer = ResettableTimer::after(Duration::from_millis(100));
        let mut pinned = std::pin::pin!(timer);

        // Reset to a time in the past
        pinned
            .as_mut()
            .reset(Instant::now() - Duration::from_millis(10));

        let start = Instant::now();
        pinned.await;
        let elapsed = start.elapsed();

        // Should complete almost immediately
        assert!(elapsed < Duration::from_millis(20));
    }

    #[async_std::test]
    async fn test_poll_after_completion_returns_ready() {
        let timer = ResettableTimer::after(Duration::from_millis(10));
        let mut pinned = std::pin::pin!(timer);

        // Complete the timer
        pinned.as_mut().await;
        assert!(pinned.is_completed());

        // Polling again should return Ready immediately
        use futures::future::poll_fn;
        let result = poll_fn(|cx| {
            let poll_result = pinned.as_mut().poll(cx);
            Poll::Ready(poll_result)
        })
        .await;

        assert!(matches!(result, Poll::Ready(())));
    }

    #[async_std::test]
    async fn test_concurrent_awaits_with_select() {
        use futures::FutureExt;
        use futures::select;

        let timer1 = ResettableTimer::after(Duration::from_millis(50));
        let timer2 = ResettableTimer::after(Duration::from_millis(100));

        let pinned1 = std::pin::pin!(timer1);
        let pinned2 = std::pin::pin!(timer2);

        let start = Instant::now();

        select! {
            _ = pinned1.fuse() => {
                let elapsed = start.elapsed();
                assert!(elapsed >= Duration::from_millis(50));
                assert!(elapsed < Duration::from_millis(80));
            }
            _ = pinned2.fuse() => {
                panic!("timer2 should not complete first");
            }
        }
    }
}

Why Your Code is Unsound

1. Unsound pin projection in poll()

// YOUR CODE - WRONG:
match std::pin::Pin::new(&mut self.timer).poll(cx) {

Problem: Pin::new() requires the value never moved before. But self.timer is just a struct field - nothing prevents it from being moved. After async_io::Timer is polled once, it contains pointers to itself (for reactor registration). If the struct moves, these pointers become dangling.

Result: Undefined behavior - memory corruption, crashes, or silent data corruption.


2. Dropping pinned data in reset()

// YOUR CODE - WRONG:
pub fn reset(&mut self, duration: std::time::Duration) {
    self.timer = async_io::Timer::after(duration);  // Drops old timer!

Problem: After polling, timer is pinned and may have registered itself with the async reactor. This line:

  1. Drops the old timer (reactor now has dangling pointer)
  2. Creates new timer in same location
  3. Reactor wakes a freed memory address

Result: Undefined behavior - use-after-free.


How pin_project Fixes This

#[pin_project]
pub struct ResettableTimer {
    #[pin]
    timer: async_io::Timer,  // โ† Marks this field as "structurally pinned"
    waker: Option<Waker>,
    completed: bool,
}

impl Future for ResettableTimer {
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();  // โ† Safe projection from Pin<&mut Self> to Pin<&mut timer>
        
        match this.timer.poll(cx) {  // โ† this.timer is Pin<&mut Timer>, not &mut Timer
            // ...
        }
    }
}

What pin_project does:

  1. Generates safe projection code that maintains pin guarantees
  2. Ensures if ResettableTimer is pinned, then timer is also pinned
  3. Provides .set() method for replacing pinned field contents safely
  4. All generated code is verified to be sound

Bottom Line

Without pin_project: You're lying to the compiler about pinning guarantees โ†’ undefined behavior

With pin_project: Compiler-verified safe pin projections โ†’ guaranteed memory safety

Your code appears to work because you're "getting lucky" - the UB hasn't manifested yet. But it could break with different optimization levels, different async runtimes, or different timing.

1 Like

This is entirely incorrect.

  1. Pinning does not require the value to not have previously moved. It requires that it will never move after the pinning.
  2. Pin::new() is a safe function which operates only on Unpin types, which do not depend on pinning. If async_io::Timer were not Unpin, this code would not compile.

This is not correct. If the reactor had such a pointer (in fact, we know it does not, because Timer implements Unpin) then this code would still be correct, because the safety condition for presenting a value as pinned (i.e. using unsafe Pin::new_unchecked()) is that it must not be moved before it is dropped. In this case, it is being dropped, without being moved. (If you used std::mem::replace instead, say, that would be incorrect since it would move the value.)

Also, see the general principle: this code is safe, therefore it cannot be responsible for causing undefined behavior โ€” if the program has any UB, then the fault lies with some unsafe code, not this code.

Please learn more about both unsafe and pinning before posting such bold claims.

8 Likes

Thank you for the correction! You're absolutely right, if the code compiles with safe functions, it can't be causing UB. Apologies for the incorrect analysis.
Seems I've inadvertently applied Cunningham's Law: "The best way to get the right answer on the Internet is to post the wrong answer." Worked well for both Niel and me to learn from your response.
The real improvement here would be using timer.set_at(deadline). instead of creating a new timer each time. Thanks for taking the time to set this straight.

3 Likes

the async_io::Timer already support resetting, there are Timer::set_after() and Timer::set_at(), which re-schedule the timer in the reactor while keeping the registered waker in place, which is different from constructing a new timer and inserting it into the reactor's timer queue.

or do you have a different use case?

1 Like

Yes indeed, I already noticed that and used that in my smol-implementation here.

Using set_at seems to keep the waker, but I don't think that is sufficient for our use case by reading nickm's comment:

I don't think this has the expected behavior; calling reset() on an AsyncStdSleep future will not cause anybody who was waiting on the AsyncStdSleep previously to wake up in response to the new timer expiring.

Maybe I misunderstand it, but I interpreted it that if we .await the timer in one task, but then the duration gets reset in another task, the timer should still expire after the new duration. This is why I added the completed boolean.

do you need to use a single timer as synchronization event for multiple async tasks? that is certainly beyond what a simple async fn sleep(duration) can offer.

in that case, I think you need a multi waker registration to achieve that goal.

also, I think it's not suitable to implement Future directly on the timer struct, it's better to use some form of async "handle" type, where a single timer object can derive mutiple await-able handles, the API could be used like this:

let timer = Timer::after(duration);

let expire = timer.expire();
spawn(async move {
    println!("task 1 sleeping");
    expire.await;
    println!("task 1 awaken");
});

let expire = timer.expire();
spawn(async move {
    println!("task 2 sleeping");
    expire.await;
    println!("task 2 awaken");
});

// main task can reset or cancel the timer
timer.reset(new_duration);

// or `await` for expiration too:
timer.expire().await;

this api style is borrowed from the event-listener crate, where you create an Event that can be signaled/notified, but you await on a separate EventListener type that was derived from the Event. (btw, EventListener also support blocking api via the Listener trait so you can use it in non-async context).

as a convenience, you can implement IntoFuture for the Timer if the most common usages only involve a single task.

2 Likes

Thanks for your answer. I like that API a lot! Indeed seems a lot more cleaner for multiple tasks.

@nerditation I followed your advice and made an attempt on a new implementation.

I made a shared type Inner containing the state (timer, wakers, complete status). ResettableTimer holds a reference to this type. Then I made a separate type Expire which also holds a weak reference to Inner.

ResettableTimer now has a method expire() which returns Expire. This functions as the "handle". The future trait is now implemented for Expire so we now no longer await ResetableTimer itself but separate instances of Expire for each task.

Each instance of Expire share the same knowledge of the inner state because they contain a weak reference to the same Inner as the "global" ResettableTimer. This way when we reset the duration, all instances of Expire know it. With the weak reference we can also temporarily upgrade the reference to a full Arc to push the waker of the task the specific Expire is ran in.

Is this what you had in mind? Thanks a lot for your help!

struct Inner {
    pub timer: async_io::Timer,
    pub wakers: std::vec::Vec<std::task::Waker>,
    pub completed: bool,
}

pub struct ResettableTimer {
    inner: std::sync::Arc<std::sync::Mutex<Inner>>,
}

impl ResettableTimer {
    pub fn new(duration: std::time::Duration) -> Self {
        Self {
            inner: std::sync::Arc::new(std::sync::Mutex::new(Inner {
                timer: async_io::Timer::after(duration),
                wakers: Vec::new(),
                completed: false,
            })),
        }
    }

    pub fn reset(&mut self, duration: std::time::Duration) {
        let mut inner = self.inner.lock().unwrap();
        inner.timer.set_at(std::time::Instant::now() + duration);
        inner.completed = false;
        for waker in inner.wakers.drain(..) {
            waker.wake();
        }
    }

    pub fn expire(&self) -> Expire {
        Expire {
            inner: std::sync::Arc::downgrade(&self.inner),
        }
    }
}

pub struct Expire {
    inner: std::sync::Weak<std::sync::Mutex<Inner>>,
}

impl Future for Expire {
    type Output = ();

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
        let inner_arc = match self.inner.upgrade() {
            Some(arc) => arc,
            None => return std::task::Poll::Ready(()),
        };
        let mut inner = inner_arc.lock().unwrap();

        if inner.completed {
            return std::task::Poll::Ready(());
        }

        match std::pin::Pin::new(&mut inner.timer).poll(cx) {
            std::task::Poll::Ready(_) => {
                inner.completed = true;

                for waker in inner.wakers.drain(..) {
                    waker.wake();
                }

                std::task::Poll::Ready(())
            }
            std::task::Poll::Pending => {
                if !inner.wakers.iter().any(|w| w.will_wake(cx.waker())) {
                    inner.wakers.push(cx.waker().clone());
                }
                
                std::task::Poll::Pending
            }
        }
    }
}

#[async_std::main]
async fn main() {
    println!("Starting timer...");
    let mut timer = ResettableTimer::new(std::time::Duration::from_secs(5));

    let handle1 = timer.expire();
    async_std::task::spawn(async move {
        println!("Handle 1 waiting...");
        handle1.await;
        println!("Handle 1 expired!");
    });
    
    let handle2 = timer.expire();
    async_std::task::spawn(async move {
        println!("Handle 2 waiting...");
        handle2.await;
        println!("Handle 2 expired!");
    });


    timer.reset(std::time::Duration::from_secs(15));

    timer.expire().await;
    println!("Timer expired!");
}

that's pretty much how I would implement this api.

since you use Weak instead of Arc for the handle type, I assume you want the "outside" to have full control over the timer and don't want awaiting tasks to keep the timer alive, in which case I would probably add a Drop impl to ResettableTimer (or Inner) to wake up all the tasks (Waker will NOT automatically wake the task when dropped. the tasks, at least one of them, might still be waken when the reactor schedules the expired timer, but I assume you want cancellation semantics).

depending on the use case, the Expire future could also yield a boolean flag (or Option or Result, instead of just ()), to indicate if the wakeup is expiration or cancellation.

EDIT:

I want to point out, this implementation is simpler than event-listener, but it has a flaw:

when the async runtime polls tasks spuriously (i.e. polling a task again before previous waker being called), you may waste memory to store extra wakers, as Waker::will_wake() is only heuristic, wakers are allowed to return false even if two wakers are linked to the same task.

this is safe, as calling an outdated waker should be ignored by the runtime, but you lose some efficiency doing so.

1 Like

@nerditation Instead of having different handles for each task. Could the same also be achieved by implementing Future for ResettableTimer and using a shared future?

that's an interesting idea. I have not used Shared before, but looking at the code, it uses a slab to store the wakers, and each handle is allocated a unique key, this is actually very similar to my idea to solve the spurious polling.

based on your implementation, my idea was to make the Vec<Waker> append only, then each Expire is given a slot index. when being polled, it stores the waker in its own slot. this is a very simplistic but practical arroach, compared to the intrusive linked list used by event-listener.

in my scheme, the maximum wakers needed to be stored is bounded by the number of total handles created from the same timer, while for the Shared future, the memory is bounded by the number of live handles, since slab can reuse keys.

in conclusion, I think Shared would work great: at the end of day, you are basically re-implementing the functionality of Shared with the custom Future anyway.

or, you can adapt the Shared intto the custom future, particularly, Slab is a better choice than Vec to store multiple Wakers.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.