Hi, I am working on a rate limiter for outgoing UDP packets.
It is a piece of code that has the purpose of adjusting the speed of outgoing packets according to various inputs. I am having problems when trying to adjust the outgoing speed to be very fast when using tokio_core::reactor::Timeout.
My problem is as follows: After creating a new Timeout, one has to poll() it and have an Ok(Async::NotReady) result to make sure the task will be polled again. However, when creating a Timeout with a short duration, if I call a poll() over it I get an Ok(Async::Ready(_)) result, therefore the task will never be polled again.
I wrote a self contained piece of code to demonstrate my problem:
extern crate futures;
extern crate tokio_core;
use std::time::Duration;
use tokio_core::reactor::{Timeout, Handle, Core};
use futures::{Async, Future, Poll};
struct TimeoutFuture {
timeout_opt: Option<Timeout>,
handle: Handle,
}
impl Future for TimeoutFuture {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.timeout_opt = match self.timeout_opt.take() {
None => {
println!("First poll() call");
let mut timeout = Timeout::new(Duration::new(0,1000), &self.handle).unwrap();
timeout.poll();
Some(timeout)
},
Some(timeout) => {
println!("poll() was called again!");
Some(timeout)
},
};
Ok(Async::NotReady)
}
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let timeout_future = TimeoutFuture {
timeout_opt: None,
handle: handle.clone(),
};
core.run(timeout_future).unwrap();
}
With the current code presented above on (on my computer), this is the output:
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 1.22 secs
Running `target/debug/timeout_check`
First poll() call
And the program is stuck. poll() is never called again.
The two lines to play with are:
let mut timeout = Timeout::new(Duration::new(0,1000), &self.handle).unwrap();
timeout.poll();
First, if I remove the timeout.poll() line, I get the same behaviour (First poll() is called, second poll() is not called, program is stuck).
If I remove the timeout.poll()
and increase the duration, obtaining:
let mut timeout = Timeout::new(Duration::new(0,1000000), &self.handle).unwrap();
// timeout.poll();
I still get the same behaviour.
Finally, if I put timeout.poll() back:
let mut timeout = Timeout::new(Duration::new(0,1000000), &self.handle).unwrap();
timeout.poll();
I get this output:
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 1.23 secs
Running `target/debug/timeout_check`
First poll() call
poll() was called again!
which is the wanted behaviour. However, the duration is very long, which is not what I want.
I want to be able to somehow create a new Timeout and have my task being polled in the near future (according to the Duration specified at the timeout), without having to poll() the timeout. This is because in my case the Duration is very short, and polling the timeout without having it turn into Ready is not possible.
To solve this I tried to look at the implementation of tokio_core, mostly the part of TimeoutToken in reactor/timeout_token.rs. However, it seems like this part is private, and I can not access it from my code.
I appreciate any ideas of how to solve this problem!