Taking action on Tokio timer expiry

I'm exploring timers, using (again) the echo-udp example as a starting point.

I wanted to extend it so that, on receipt of a datagram, it would do something after a delay, perhaps answer back as before, or maybe just print something to the console. In either case, the delay shouldn't block other datagrams from being received and processed.

There is a tokio-timer crate, but after studying the documents for that, I still quite couldn't figure out how to approach the problem. I tried modifying the poll() method to simply print a message after a delay. The whole method was this:

fn poll(&mut self) -> Poll<(), io::Error> {
    loop {
        if let Some((size, peer)) = self.to_send {
            let timer = Timer::default();
            let sleep = timer.sleep(Duration::from_millis(500));
            sleep.and_then(|result| {
                println!("timer expired {:?}", result);
            });
            self.to_send = None;
        }

        self.to_send = Some(try_nb!(self.socket.recv_from(&mut self.buf)));
    }
}

That didn't compile.

error[E0277]: the trait bound `(): futures::Future` is not satisfied
  --> src/main.rs:47:23
   |
47 |                 sleep.and_then(|result| {
   |                       ^^^^^^^^ the trait `futures::Future` is not implemented for `()`
   |
   = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`

The error message may be because the Item type alias defined in the tokio_timer::Sleep struct is () (the unit type). If that's the case, then maybe and_then() can't be used here? What's the technique for doing something when a timer expires and how do you squirrel away information before starting the timer that the timeout can use later?

Thanks,

Chuck

2 Likes

If it helps, I can show you an example of using tokio_core::reactor::Timeout directly: https://github.com/mozilla/sccache/blob/48f1196f490b3f01a63fe2fba2f40144b51aaba2/src/compiler/compiler.rs#L132

We're using that to add a time limit to a cache lookup.

2 Likes

I looked that over, but couldn't comprehend most of it. I'm getting tripped up on the mechanics (what is into_future() or flatten()? What does map() do?, what happens when you chain_err(), etc.) and not getting anywhere near to understanding the concepts. I wonder if there are some simplified examples where the various parts are annotated. I'm finding that it only takes a couple lines of code to get stuck at "what's happening here?"

2 Likes

Use map instead of and_then. This might be a little unintuitive, but and_then expects a new future back, while map just transforms the positive Result case. Transformation includes ignoring it and just printing.

1 Like

It is a bit of a chicken-and-egg problem. This is not the place for attempting an extended explanation of concepts, but let's put it this way: with futures, your goal is to transform an input future into a result you need. You do it through a chain of transformations, each of which takes the result of one future and produces another future, nearer to the goal. The final future yields the desired result.

The catch is that you must do it in this way, and let go of all imperative control-flow constructs you'd employ in synchronous code. No branching except through select() and friends. No for or while, use streams. Need to pick items from a stream in a way that can't be expressed by a stream combinator? Wrap it in another stream. (And btw, a lot of things can be done with combinators.) Be on the lookout for Error type mismatches. Be prepared for monstrous type chains in error messages.

(Okay, the above is not strictly true for all cases, but approaching problems with a "functional" outlook is immensely helpful when dealing with futures.)

If it helps, picture it as a kind of torch relay: the next future must be lit by the previous one. The only other way of lighting a future is through a handle to an event loop which executes all tasks associated with the futures. When combining futures, all should be lit; a single unlit future can douse the relay. (The compiler will warn that "futures do nothing unless polled", but it can't do that in all cases.)

With that in mind, let's see how timeout is handled in the highlighted function. Timeout::new() takes a Duration and an event loop handle, presumably our own, and returns io::Result<Timeout, io::Error>. Then, into_future() transforms that into Future<Item = Timeout, Error = io::Error>. Since Timeout is itself a Future, we have a future of a future, and flatten() extracts just the ultimate future, i.e., Timeout. The whole dance is a way to avoid explicit error handling.

From earlier transformations, we have cache_status, a future yielding the result of querying the cache, which may involve trips to Amazon's S3 and the like. We'd like to cap the duration of that query; that's what timeout is for. We'll combine cache_status and timeout with select(), which takes two futures and returns the first one which resolved to a value. However, we must transform both futures before that, since timeout resolves to (), and cache_status to the Cache enum, so we map() the result of cache_status to Some(Cache), and the result of timeout to None; in effect, a successful resolution of select() yields Option<Cache>. Notice that timeout expiration is also treated as a success; chain_err() is there to make error messages for timer handling errors more informative.

1 Like

Thanks for the overview. I do understand that the futures model involves a series of transformations, but I'm unable to put my (admittedly very shaky) understanding into practice. Let me recap the project that initially started this inquiry, a program that would:

  • Listen for incoming UDP packets

  • Upon receipt of a packet, note the IP address of the sending peer and the packet contents, then start a timer to do something with this information later.

  • Immediately return to the listening for more incoming packets.

  • When the timer for a specific packet expires, print a message with the contents of the packet and the address of the sender.

The packet-arrival, wait-a-bit sequence sounds like a chain of futures kind of situation. At the end of wait-a-bit, the final future has arrived. Given that, thought I'd start with the echo-udp example and modify it. I didn't get very far.

First up was the act of selecting the object that would perform the delay. The tokio-timer crate has Timeout, Sleep and related structs, but there is also a Timeout in tokio-core::reactor. The documentation doesn't really provide enough infomation on what's appropriate for various uses, and that alone was confusing. After flailing around with the tokio-timer::Sleep, getting compile errors and not really understanding how it worked anyway, I decided to try the reactor::Timeout instead.

Next up was how to incorporate it. I figured that inside the poll() method, at the point I received a packet from recv_from() was the place I'd start the timer. To create a Timeout I'd need to give it a handle to the event loop. I wasn't sure how to get that handle. Is there some way to obtain it inside poll()? Do I create another event loop? Also, how do I save away the information I use later? Is this really the right place to set this timer up? Does the timer somehow needs its own poll() method? Is echo-udp really a good starting place or is there something else?

This thicket of questions probably makes it obvious I'm finding Tokio pretty impenetrable. Is there a place to find accessible explanations and lots of examples to build on? I have gone over the docs on the website, but so far, no breakthrough.

1 Like

Let me preface that I've not played around with tokio much, so what I'm about to show below is likely not ideal :slight_smile:.

So, assuming you want the timeout to resolve on the event loop (Core), you probably want to use tokio_core::reactor::Timeout. The constructors there require a tokio_core::reactor::Handle reference, which is basically a handle back to the Core. Let's modify the Server from that udp example as follows:

struct Server<'a> {
    socket: UdpSocket,
    buf: Vec<u8>,
    to_send: Option<(usize, SocketAddr)>,
    handle: &'a Handle
}

impl<'a> Future for Server<'a> {
...
}


l.run(Server {
        socket: socket,
        buf: vec![0; 1024],
        to_send: None,
        handle: &handle
    }).unwrap();

Ok, so now we have a Handle that Timeout will use. I added the following code to poll:

// If we're here then `to_send` is `None`, so we take a look for the
            // next message we're going to echo back.
            self.to_send = Some(try_nb!(self.socket.recv_from(&mut self.buf)));
            let timeout = Timeout::new(Duration::from_secs(5), self.handle).unwrap();
            let peer2 = self.to_send.unwrap().1.clone();
            let mut buf_clone = self.buf.clone();
            unsafe { buf_clone.set_len(self.to_send.unwrap().0); }
            let cb = timeout.map(move |_| println!("after timeout of peer {:?}, contents: {:?}", peer2, ::std::str::from_utf8(&buf_clone))).map_err(|err| println!("{:?}", err));
            self.handle.spawn(cb);

As some have mentioned, we map the result of the timeout (i.e. it completed), and this gives us a future to that operation. We then spawn that future back onto the reactor/event loop.

Note I clone'd gratuitously here, but I hope that's ok for a simple example. This might be enough to get you going and/or give you an idea.

1 Like

That UDP echo example is perhaps not the best place for initial experimentation, since it doesn't use a chain of futures which may be manipulated and extended, but is itself an implementation of a future which runs until interrupted. You definitely don't want to block in poll(); see the API docs. Something closer to the traditional server example could use UdpSocket::framed(), which requires writing a codec.

As for the documentation, I can't guess what you'd find accessible enough, but can only tell what worked for me: the combination of API docs, working code written by others (tokio-socks5 is one reasonable starting point), the Gitter channel, and time.

Yes, I think I'll take a look at tokio-socks5 because I do want to understand chains of futures.

In the short term, Here's my summary of what happened in the code Vitaly added:

  • The handle to the core that's needed to spawn new timeouts on is simply added to the struct to make it available inside poll(). That's a simple structural change.

  • The data we want to use at the conclusion of the timeout (the contents of the packet and the address of the sender) are cloned so thay can be easily saved away and disposed of later when no longer needed. Calling set_len() on the buffer clone ensures the resulting clone doesn't have extra bytes at the end. I didn't know why that was the case, but it seems like a minor detail in the way clone() is implemented.

  • When the map() method is passed the closure, ownership of these items is transferred to the closure via the move keyword. I think this means that when the closure finishes, these items go out of scope and are removed, which is Rust's ownership model at work.

  • map() is passed a closure which implicitly implements the FnOnce trait, and returns a Map, which implements Future. The closure is invoked from within the Map's poll() method when the event loop calls it at the appointed time. The closure, I understand. map() is harder to grasp. It's documentation states: "Map this future's result to a different type, returning a new future of the resulting type.". From what I can tell, Timeout::new() returns a Result parameterized to contain a Timeout (which implements future). map() then turns it into a different future. The question is why was the transformation needed and what specific transformation occurred? If a newly created Timeout implements Future, isn't that future "sufficient"?

  • Finally, the key to adding additional things to the event loop is calling the spawn() method on the handle, which I take to mean "add this future to the event loop."

Slow, but steady progress. Thanks for the insights provided by everyone.