Need help understanding tokio::time::timeout mechanics

Hello all! I'm new both to Rust and tokio, and I find myself at sea even reading the docs. The timeout function seems straightforward enough, but then you have this scary sentence:

Note that the timeout is checked before polling the future, so if the future does not yield during execution then it is possible for the future to complete and exceed the timeout without returning an error.

Does that mean the future could exceed the timeout by microseconds and not be caught in time to trigger a timeout, therefore technically timing out without an error, or does it mean a long-running blocking task could run forever without triggering timeout? I assumed the former, but now I'm hitting a sticking point in my code that suggests the latter.

What I'm trying to accomplish is to measure timeouts caused by synchronous std::net::tcp::TcpStream read/write operations within an async function, but to make sure I understood the tokio timeout mechanics I wrote the following test code:

// main.rs:
use tokio::time::{Duration, timeout};
use wrinkledytime::{concurrent_test_sleeper, sync_test_busy_driver, sync_test_sleeper_driver};

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let res_async_sleep = match timeout(
        Duration::from_secs(5),
        concurrent_test_sleeper(Duration::from_secs(10)),
    )
    .await
    {
        Ok(_) => String::from("uh oh, async time is disobeying me!"),
        Err(e) => format!("async sleep timed out as expected with {}", e),
    };
    dbg!(res_async_sleep);

    let res_sync_sleep = match timeout(
        Duration::from_secs(5),
        sync_test_sleeper_driver(Duration::from_secs(10)),
    )
    .await
    {
        Ok(_) => String::from("uh oh, sync sleep time is disobeying me!"),
        Err(e) => format!("sync sleep timed out as expected with {}", e),
    };
    dbg!(res_sync_sleep);

    let res_sync_busy = match timeout(
        Duration::from_secs(5),
        sync_test_busy_driver(Duration::from_secs(10)),
    )
    .await
    {
        Ok(_) => String::from("uh oh, sync busy time is disobeying me!"),
        Err(e) => format!("sync busy timed out as expected with {}", e),
    };
    dbg!(res_sync_busy);
}

// lib.rs
use tokio::time::{Duration, sleep};

pub async fn concurrent_test_sleeper(duration: Duration) -> std::result::Result<(), &'static str> {
    sleep(duration).await;
    Ok(())
}

pub async fn sync_test_sleeper_driver(duration: Duration) -> std::result::Result<(), &'static str> {
    sync_test_sleeper(duration)
}

pub async fn sync_test_busy_driver(duration: Duration) -> std::result::Result<(), &'static str> {
    sync_test_busy(duration)
}

pub fn sync_test_sleeper(duration: Duration) -> std::result::Result<(), &'static str> {
    std::thread::sleep(duration);
    Ok(())
}

pub fn sync_test_busy(duration: Duration) -> std::result::Result<(), &'static str> {
    let clock = std::time::SystemTime::now();
    loop {
        if clock.elapsed().unwrap() >= duration {
            break;
        }
    }
    dbg!("busy for {:?} ms", clock.elapsed().unwrap());
    Ok(())
}

The concurrent_test_sleeper() future's timeout is detected successfully and uses the classic example with tokio::time::sleep(). However, neither of the sync test function timeouts were detected. If I understand correctly, the sync_test_sleeper() and sync_test_busy() functions are both essentially what I would be seeing if std::net::tcp::TcpStream operations ran long, essentially blocking the calling thread. As I feared, tokio timeout allows both those functions to run as long as they want, double the timeout duration. It doesn't detect that a timeout occurred after the fact either, returning an Ok result.

This insight raises the following questions for me:

  1. How can I structure/wrap/refactor my synchronous code such that tokio timeout will monitor it successfully? I've seen that tokio has its own TcpStream struct with AsyncRead and AsyncWrite trait implementations, but it looks like wrapping the std::net::tcp::TcpStream calls in tokio::task::spawn_blocking() might also help -- is there a preferred/better approach?
  2. Why is it that tokio timeout can't detect the timeout in the above sync_test* cases? From what I've read there's a lot of talk about polling futures, which I don't really understand, but it sounds like essentially tokio timeout can only check to see if a timeout has occurred if the measured code yields e.g. via await. If that's correct, isn't it possible that even well-behaved async code might yield infrequently enough that you can get arbitrary timeout overruns e.g. an async function is set to time out after 5 seconds but it doesn't yield for 6 seconds, so the earliest you can detect the timeout is a full second late?
  3. Why didn't tokio timeout detect that a timeout had occurred and return an error once the synchronous code was finally finished?
  4. How does tokio::time::sleep() operate differently to std::thread ::sleep() such that tokio timeout can detect its timeout?
  5. How does polling the future given to timeout function under the hood? I've looked at the source code, but I'm afraid it's over my head. The docs could really use diagrams, please and thanks!

thanks very much!

The question to ask is: when the future is polled, how long does it run before either completing or suspending? However long that is, that is how long the timeout might be exceeded by.

That is correct. Code such as this should not be run in an async task — it should be wrapped in spawn_blocking() or similar. Your code is violating the principle of cooperative multitasking by not cooperating.

Tokio TcpStream is true async IO which can be cancelled at any time; blocking code cannot.[1] spawn_blocking() runs blocking code on a separate thread which still cannot be cancelled; it just makes sure that the Future you get back is well-behaved since it isn't actually running the blocking operation, just being a channel by which you can wait for completion.

Insofar as you want to be able to promptly cancel operations, you must not use spawn_blocking(). If you don’t care about the operation continuing but do want to make sure the caller can do something else and ignore it, spawn_blocking() will do.

If the code yields infrequently, then it is not well-behaved async code.

If you look at the implementation you can see that, whenever it polls the inner future, if the inner future completes and returns a value, then the timeout future will always return that value:

// First, try polling the future
if let Poll::Ready(v) = me.value.poll(cx) {
    return Poll::Ready(Ok(v));
}

This is actually a very useful property that is closely related to “cancellation safety”; for example, if you combine timeout() with a channel's recv() in a loop, you can be assured that either you got the value from recv() completing, or the timeout occurred and no value was consumed from the channel.

If timeout() behaved the way you proposed, then some values from the channel might be consumed and discarded, when the timing is unlucky. timeout() could do that, but it would be less useful that way.

tokio::time::sleep() doesn't block the calling thread — it is well-behaved async code.


  1. in the general case; specific cases may have some means ↩︎

5 Likes

Thanks, that helps a lot! So via

// First, try polling the future
if let Poll::Ready(v) = me.value.poll(cx) {
    return Poll::Ready(Ok(v));
}

when tokio timeout calls me.value.poll(cx) on my uncooperative blocking future, it basically gets blocked until the underlying blocking code returns at which point it's Ready so we wind up returning Ok from timeout regardless of how long it took?

1 Like

That's right. Any future that takes significant time should be returning Poll::Pending at least once (depending on how many times it wakes up, which could be once at the end, or repeatedly), and each of those gives the timeout a chance to check.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.