Waiting short periods of time with tokio_core::reactor::Timeout


#1

Hi, I am working on a rate limiter for outgoing UDP packets.
It is a piece of code that has the purpose of adjusting the speed of outgoing packets according to various inputs. I am having problems when trying to adjust the outgoing speed to be very fast when using tokio_core::reactor::Timeout.

My problem is as follows: After creating a new Timeout, one has to poll() it and have an Ok(Async::NotReady) result to make sure the task will be polled again. However, when creating a Timeout with a short duration, if I call a poll() over it I get an Ok(Async::Ready(_)) result, therefore the task will never be polled again.

I wrote a self contained piece of code to demonstrate my problem:

extern crate futures;
extern crate tokio_core;

use std::time::Duration;
use tokio_core::reactor::{Timeout, Handle, Core};
use futures::{Async, Future, Poll};

struct TimeoutFuture {
    timeout_opt: Option<Timeout>,
    handle: Handle,
}

impl Future for TimeoutFuture {
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        self.timeout_opt = match self.timeout_opt.take() {
            None => { 
                println!("First poll() call");
                let mut timeout = Timeout::new(Duration::new(0,1000), &self.handle).unwrap();
                timeout.poll();
                Some(timeout)
            },
            Some(timeout) => {
                println!("poll() was called again!");
                Some(timeout)
            },
        };

        Ok(Async::NotReady)
    }
}

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let timeout_future = TimeoutFuture {
        timeout_opt: None,
        handle: handle.clone(),
    };
    core.run(timeout_future).unwrap();
}

With the current code presented above on (on my computer), this is the output:

$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 1.22 secs
     Running `target/debug/timeout_check`
First poll() call

And the program is stuck. poll() is never called again.

The two lines to play with are:

let mut timeout = Timeout::new(Duration::new(0,1000), &self.handle).unwrap();
timeout.poll();

First, if I remove the timeout.poll() line, I get the same behaviour (First poll() is called, second poll() is not called, program is stuck).

If I remove the timeout.poll() and increase the duration, obtaining:

let mut timeout = Timeout::new(Duration::new(0,1000000), &self.handle).unwrap();
// timeout.poll();

I still get the same behaviour.

Finally, if I put timeout.poll() back:

let mut timeout = Timeout::new(Duration::new(0,1000000), &self.handle).unwrap();
timeout.poll();

I get this output:

$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 1.23 secs
     Running `target/debug/timeout_check`
First poll() call
poll() was called again!

which is the wanted behaviour. However, the duration is very long, which is not what I want.

I want to be able to somehow create a new Timeout and have my task being polled in the near future (according to the Duration specified at the timeout), without having to poll() the timeout. This is because in my case the Duration is very short, and polling the timeout without having it turn into Ready is not possible.

To solve this I tried to look at the implementation of tokio_core, mostly the part of TimeoutToken in reactor/timeout_token.rs. However, it seems like this part is private, and I can not access it from my code.

I appreciate any ideas of how to solve this problem!


#2

The reason your future’s poll isn’t called again is because of a golden rule in futures: if you return Async::NotReady, you need to arrange for the task associated with the future to be notified when the future should be polled again. If this arrangement isn’t done, the reactor will never poll the future again (if this weren’t the case, it would have to continuously poll it, burning cycles).

Typically, your futures will be wrapping some other underlying future, and its poll will arrange for the task to be notified if it’s not ready; the Timeout is one such future. If you’re writing a low level primitive, you need to arrange for this explicitly (eg mio network components arrange for this via the underlying selection mechanism, eg epoll on Linux).

You can do a manual notification via futures::task::current().notify() (right before returning not ready) - this is a form of a “yield”, whereby the reactor will poll your future shortly thereafter (possibly immediately if nothing else is pollable).

For short timeouts like you’re looking at, I’m not sure the tokio Timeout is appropriate - its docs mention it’s not intended to be a high-res timer. You can go with the “yield” approach but that may spin the cpu unnecessarily. Not sure if there’s a better approach that works well with tokio.


#3

Hi @vitalyd, thanks for your reply!
I tried using futures::task::current().notify(), by replacing the mentioned two lines with this code:

let mut timeout = Timeout::new(Duration::new(0,1000), &self.handle).unwrap();
match timeout.poll() {
    Ok(Async::Ready(())) => futures::task::current().notify(),
    _ => {},
};

It seems to work now! the Future is being polled again.

I wonder if there is a better way to deal with small time intervals in Tokio. I am willing to go low level, as long as I can have a Future like interface with the rest of the codebase.


#4

At a 1 micro granularity, you might actually want to stick with the yielding approach. Even if you had a high res timer, putting the thread to sleep and waking it up will likely incur a much larger delay than your target of 1us; you’ll flood the cpu with context switching and then putting the thread back on-core will take some time (eg caches, both data and instruction, may be cold).

If you yield, you may still miss the deadline (if other tasks hog the reactor for too long - 1us is a fairly small budget) and caches may still go cold but it’s less likely and you’ll avoid the context switches. The downside is you may burn a bit more CPU.

You can also try asking tokio devs about this on their github. I’d be interested myself on what they say/suggest.


#5

Hi, thanks for the advice!

It could take me a while until I finish adding the yielding approach into the current code base, so I can’t yet tell you how it worked out.

I just opened this issue: https://github.com/tokio-rs/tokio-core/issues/298 in the tokio_core repository, I am also interested to know if the devs have an idea of how to solve this.

real.


#6

I finished writing the rate limiter part of the code today.
Eventually, instead of trying to use Timeouts with smaller and smaller Duration, I chose to have a Timeout with constant Duration of one millisecond (I was told that this is the smallest that Tokio supports for actually measuring something). Then every millisecond I send k messages, where k changes according to the rate of incoming packets.

The repository is MIT licensed, it can be found here: https://github.com/realcr/fragmentos
The relevant code for the rate limiter could be found here:
https://github.com/realcr/fragmentos/blob/master/src/rate_limit.rs

Thanks for all the help!
real.


#7

Interestingly I was going to suggest a periodic polling period but figured you wanted to avoid the latency hit given the 1us resolution you were interested in. But yeah, if this approach works then it’s much easier to work with (and it won’t jitter the thread as much).

Looks like a neat project - good luck with it!