Tokio Interval not working in runtime

I'm trying to use a tokio Interval to wake up a custom Stream/Sink future (a p2p network stack that tries to reconnect to peers from time to time). The problem is that it panics if used with the default tokio Runtime. I'm using

tokio = { version = "0.2", features = ["full"] }

and this is a minimal example to reproduce it:

    use futures::StreamExt;
    #[test]
    fn test_interval() {
        let mut rt = tokio::runtime::Runtime::new().unwrap();

        async fn test(t: Instant) {
            eprintln!("Hello World!");
        }

        rt.block_on(tokio::time::interval(core::time::Duration::from_secs(1)).for_each(test));
    }

This panics at runtime with the message there is no timer running, must be called from the context of Tokio runtime. I'm not sure if this is a bug or if I have to activate anything else in the runtime, but from what I saw Runtime::new should enable time features. Do you have any ideas how to fix the problem?

Let me rewrite it a bit:

use futures::StreamExt;
#[test]
fn test_interval() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();

    async fn test(t: Instant) {
        eprintln!("Hello World!");
    }

    // This object is not created inside the runtime.
    let future = tokio::time::interval(core::time::Duration::from_secs(1)).for_each(test);

    rt.block_on(future);
}

It fails because the object is not created inside the runtime. In this case I would recommend either using #[tokio::test] like this:

use futures::StreamExt;

#[tokio::test]
fn test_interval() {
    async fn test(t: Instant) {
        eprintln!("Hello World!");
    }

    tokio::time::interval(core::time::Duration::from_secs(1))
      .for_each(test)
      .await;
}

Or you can use an async block to ensure it is created inside the runtime:

use futures::StreamExt;

#[test]
fn test_interval() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();

    async fn test(t: Instant) {
        eprintln!("Hello World!");
    }

    rt.block_on(async move {
        tokio::time::interval(core::time::Duration::from_secs(1))
            .for_each(test)
            .await
    });
}

Finally you can use the enter function to temporarily enter the runtime to create the object:

use futures::StreamExt;

#[test]
fn test_interval() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();

    async fn test(t: Instant) {
        eprintln!("Hello World!");
    }

    let future = rt.enter(|| {
        tokio::time::interval(core::time::Duration::from_secs(1))
            .for_each(test)
    });

    rt.block_on(future);
}
2 Likes

Thank you so much :smiley: I was debugging this for wayyy too long.

But I have to say the implicit data flow gives me creeps (why can't it just use the runtime/context present on first poll instead of requiring it at creation time?). Anyway, that means an async constructor for my custom future or initializing it lazily by creating a builder future on first poll should fix it.

We know the documentation on this is poor, and there are plans to improve both the panic message and the crate documentation on this.

The reason it isn't lazy is that the structure would become more complicated because it now has to handle two states, and this extra state is rarely needed because most users create it inside an async fn anyway. Additionally, it would still have to ask for the current time on creation, which would interact poorly with Tokio's time-mocking feature found in the test-util crate feature.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.