How futures/tasks work with executors?


#1

Recently I am trying to understand how future-rs and tokio-rs works. My basic understanding about the design is that:

  • futures provide primitives to compose futures into bigger futures
  • task provide methods to halt and wake up itself (i.e. park and unpark)
  • executors/event loop should actually schedule which task to halt depending on the result of poll method or which task to wake up depending on the events triggered.

Let’s say an executor execute a task which will essentially run the poll method of the future the task represents. If the future returns NotReady, the executor should install the callback to wake up the task once the resource it waits for is ready.

My confusion is how could an executor know what resources the future is waiting for? Whether a future is waiting for some socket to be ready or some long-run computation to be finished?

In the Future trait, we’ll need to implement only the poll method, so it seems to me that futures have no way to provide the “resource that I need” information to executors.

Any help(answer/link/doc) is appreciated!


#2

You aim to understand, who notifies the exectutor that the resource or computation is Async::Ready. The implementation in deep in tokio_core backed by the mio crate. Have a look on:
https://docs.rs/tokio-core/0.1.8/src/tokio_core/reactor/mod.rs.html

especially:
https://docs.rs/tokio-core/0.1.8/src/tokio_core/reactor/mod.rs.html#562


#3

This is similar to other async mechanisms, i.e. interest in readiness on a socket/fd is registered and then must be polled to find out if it is ready. The lower level objects, like TcpStream::poll_read automatically register this interest for you. If it is not ready, then Tokio will automatically schedule your Future for polling at a later date (i.e. return Async::NotReady from the poll() method). You must try to do some operation that registers interest, like read or write, for this to be registered, otherwise the future will never be called again. To get around this limitation, futures::task::current().notify(), formally park().unpark(), can be called to force Tokio to call the future again on the next loop.

Hopefully this helps? (I think I got that right…)


#4

Thank you very much for the reply!

Let me confirm: say I want to create my own future that mimic the TcpStream provided by tokio. Does it mean that I should include the registration logic inside the poll method? So that the executor such as Core.new().run() will run poll once at least.

If poll returns Async::NotReady, the executor will halt it and execute another task. Later if the resource is ready, then the executor will be notified and wake up the task. Right?

I have to admit that I’m not familiar with mio also. So my question is how is the executor notified?

AFAIK, to register an Interest, we need to do for example:

let poll = Poll::new().unwrap();
let socket = TcpStream::connect(&"216.58.193.100:80".parse().unwrap()).unwrap();

// Register the socket with `poll`
poll.register(&socket, Token(0), Ready::all(), PollOpt::edge()).unwrap();

So that later we can get notified when we poll on the Poll:

poll.poll(&mut events, Some(remaining)).unwrap();

But If we create the event loop in tokio by let mut core = Core::new().unwrap(); we are creating a new event loop(Poll object). My assumption is that the registered interest will notify the event loop that it is registered upon. That means for TcpStream to notify the executor, it has to use the executor’s event loop for registering the interest. Yet I cannot found the evidence in source code.

TL;DR. Seems that TcpStream should use the event loop(Poll) created by executor for registering interest, could you link to some code that actually does this? Thanks!


#5

Let me see if I can clarify some things here. You generally won’t need to explicitly register any interest in the Socket/FD (TcpStream). Tokio will do that for you. What you’re interested in is the Tokio version of TcpStream. That takes a Handle and will return a non-blocking TcpStream which can be polled.

When working directly with TcpStream at this level, it becomes your responsibility to poll the TcpStream properly in your own poll()

fn poll(&mut self) -> Poll<DataToReturn> {
    // sending...
    if self.is_sending() {
        match self.tcp_stream.poll_write() {
            Ok(Async::Ready) => (), // continue
            p @ _ => return p, // not ready or error...
        }
        
        // this actually need a match too, but for simplicity:
        self.write(&mut self.some_buf);
        // See also the AsyncWrite::write_buf(...) which can be used to send a buffer...
    }
 
    // reading
    if self.is_reading() {
       match self.tcp_stream.poll_read() {
            Ok(Async::Ready) => (), // continue
            p @ _ => return p, // not ready or error...
        }

       // this also needs a match, but for simplicity:
       self.tcp_stream.read(&mut self.buf);
       // again, see the AsyncRead::read_buf(...) that simplifies this.
    }

    // if you got data (code not present) transform it
    Ok(Async::Ready(DataToReturn::from_buf(self.buf)))
}

If you want a larger example, take a look at my TcpStream for DNS packets: https://github.com/bluejekyll/trust-dns/blob/master/client/src/tcp/tcp_stream.rs#L194

I wrote this before a lot of the new Tokio stuff stabilized. It’s still correct, but there are higher order features now to make some of this boiler plate and state management unnecessary, like the AsyncRead and AsyncWrite interfaces. But you also look at codec::Framed, and the codec::Decoder and Encoder which again abstract out some more boiler plate. I have plans to upgrade my implementations to this, but haven’t gotten around to it.

I hope this helps! Also, I assume you have come across the https://tokio.rs/ site which has a ton more examples and documentation.


#6

mio in Rust is built on low-level operating system primitives such as select/epoll/kqueue. It provides a special polling function that blocks the thread without wasting CPU doing nothing. When the operating system notices that one of the registered file descriptors (socket or pipe) is ready for read or write, it will interrupt the polling function and tell the polling function which file descriptor(s) are ready. The event loop (Core) is basically a while loop that repeatedly calls poll and dispatches to the appropriate tasks.

Whenever you initiate I/O through the Tokio-specific functions (e.g. any of the tokio::io::* functions), it will register the file descriptor with mio along with some info to indicate which task is waiting on this file descriptor (“unparking information”). Then, it returns NotReady and the task is stashed away (“parked”). The Core will not bother dispatching this task until the mio polling function indicates that at least one of the file descriptors of the task is ready. This is the “unparking mechanism” for Tokio Core.


#7

From my understanding lurking on gitter.im/tokio-rs/tokio, if you’re creating an asynchronous (I/O, or otherwise) primitive, in your poll function you must register (with something) to be notified when your primitive becomes ready, and only then do you return Async::NotReady. Otherwise, your future is never “unparked”.

tokio_core::net::TcpListener delegates [1] [2] to tokio_core::reactor::PollEvented to register a notification before returning an Async::NotReady.

[1]: https://github.com/tokio-rs/tokio-core/blob/master/src/net/tcp.rs#L72
[2]: https://github.com/tokio-rs/tokio-core/blob/master/src/net/tcp.rs#L79

The excellent comments for tokio_core::reactor::PollEvented [3] should provide the details you need.

[3]: https://github.com/tokio-rs/tokio-core/blob/master/src/reactor/poll_evented.rs

If you’re only running a computation, you can do this in a future, but it will block running to completion. The only way to yield execution that I’ve seen is to call futures::task::park().unpark() to delay execution until the next “tick” of the event loop.

There are layers beneath the tokio::net::TcpStream. If you don’t use them, then yes, you’ll be implementing the registration logic. If you reuse them (e.g. PollEvented), then no, they’ll provide the API you use to read data asynchronously and take care of the registration for you.

You might also skim through the code for mysql_async which defines its own futures for managing database connections and connection pools. The Pool source code [4] demonstrates parking a GetConn future when all connections are in use. The Pool holds a list of parked tasks waiting for connections, and as the Core event loop completes its futures, the connections held within are released back to the Pool via a function call [5] that will unpark the tasks that were waiting for connections.

[4]: https://github.com/blackbeam/mysql_async/tree/master/src/conn/pool
[5]: https://github.com/blackbeam/mysql_async/blob/master/src/conn/pool/mod.rs#L151

Also, note with version 0.14.0, the futures crate renamed park and unpark in favor of new terminology [6].

[6]: https://github.com/alexcrichton/futures-rs/blob/master/CHANGELOG.md


#8

Thanks for all the reply. After digging into more source code, I think I understand now. Here I’d like to clarify what I’ve learned. Hope it could help others potentially.

Correct me if I am wrong.

Players

In order to understand the design of futures-rs and tokio-rs we need to notice 3 players:

  1. futures. The component to describe some operations to be completed. In order to implement a custom future, we need to implement method poll which is responsible for checking the resource it needs asynchronously.
  2. task. A task is a future that is being executed. We only need to know that it is a wrapper over futures that provides park() and unpark()(as @boxofrox point out, the new terminology is current() and notify()) to halt and wake up the thread that future is running on.
  3. executor. An event loop that monitors the resources futures are waiting for, and wake up the task that is waiting for the resource accordingly. It will call future’s poll method eventually to drive the execution of a future.

Jobs of Players

First notice task is not the main character, so we’ll ignore it.

There are two jobs to be done in order for the whole system to work:

  1. Register the resources(e.g. file descriptors) to be monitors to the event loop.
  2. Schedule(i.e. halt/wake up) the task according to different states(ready/not ready).

For registering resources, obviously the executor cannot know what the resources the futures need, so it is not possible for the executors to do the task. So it is reasonable for the futures to register the resources that needs to monitor.

We can infer that the future and executor should come together. That means if we have another executor that is not based on mio, the futures provided by tokio cannot be used with it without modification. Because the registration logic require the futures to know what events the executor support and how the registration are done.

For Scheduling tasks: wake up. Since it is the executor who got notification, it should be executor’s work to wake up a task(maybe that’s why unpark is renamed to notify?).

For Scheduling tasks: halt, technically, I would say there is no explicit method call to make to “halt a task”. Once the future’s poll method returned, the future gave away it control, thus consider “halted”.

How are the jobs done?

Note that how to handle the above tasks are implementation specific. Here we focused on tokio's implementation which in my opinion make things harder to understand.

Resource registration

We already know that the event loop is created by executor, and the futures are responsible for register the interest for notifications. Let’s start with the tokio’s example:

    let mut core = Core::new().unwrap();
    let handle = core.handle();

    // Bind the server's socket
    let addr = "127.0.0.1:12345".parse().unwrap();
    let tcp = TcpListener::bind(&addr, &handle).unwrap();

Here Core::new() will create a new event loop behind the scene and core.handle() will return a handle to access the event loop later. For example to register the interests.

Notice that TcpListener will accept a handle in its arguments. If we check the source code:

  • the TcpListener::bind(...) will create a new PollEvented by calling PollEvented::new(..., handle)
  • It will in turn create a tokio_core::io_token::IoToken by calling IoToken::new(..., handle)
  • It will call handle.add_source(...) to register the socket created by TcpListener as interest in the Core executor’s event loop.

tokio_core::net::TcpListener delegates to tokio_core::reactor::PollEvented to register a notification before returning an Async::NotReady.

What @boxofrox said here is partially wrong in the sense that the registration is actually not done in the poll method, but in the bind method. But the above quote is actually right because if we look closely at the implementation of add_source:

        try!(self.io.register(source,
                              mio::Token(TOKEN_START + entry.index() * 2),
                              mio::Ready::readable() |
                                mio::Ready::writable() |
                                platform::all(),
                              mio::PollOpt::edge()));

We found that it register for both readable() and writable() event! That means our future will listen to both read and write event. That cannot be right, and thus PollEvented will need to correctly “register” the interest in its poll_read and poll_write method.

It is the implementation detail that PollEvented will register the interest in every call of its poll... method. In some ideal world, the registration could be done once only.

Also note that handle.add_source(...) is provided by executor and it supports only the IO resource. There is no good interface in tokio to register custom events AFAIK.

Finally we got the conclusion that executor should provide interfaces for registering event. But it is not standardized. Thus executors and futures should come in pair, one future for one executor, it cannot be used for another executor directly.

Scheduling for tasks

The ideal case would be strait forward: the executor receive notification and “notify” the task/future to run. If the poll method return Async::NonReady, executor could go on to run other tasks (e.g. not explicit halt).

Here we need to consider two things: How could executor schedule the first run of the future?

Core::new() will fire an event when creation, so that at least the future run by Core::run(..) will be executed for at least once.

Also when registering tasks by handle.spawn(...), the executor will do similar things to fire a default event to ensure the first run.

In the ideal case, that’s all that we need to know. Yet tokio does some special settings as we described previously that add_source(...) will register for both readable and writable by default.

As @boxofrox points out:

in your poll function you must register (with something) to be notified when your primitive becomes ready, and only then do you return Async::NotReady. Otherwise, your future is never “unparked”.

That’s because handle.add_source(...) by default will not register callbacks for read and write event. Thus the effect of calling PollEvented::poll_read or PollEvented::poll_write will actually send event to executor asking it to register the correct callback. So that later when the resource is ready, the future will be polled again.

Beyond IO

What I want to mention finally is that tokio is a special case. It has special configurations of its own as we described before. So it might be a bit misleading if we want to apply tokio’s knowledge to general future/executor architecture. For example, in tutorial futures-model, it says:

However polling begins, if any of the interior futures produced a NotReady result, it can grind the whole task to a halt—the task may need to wait for some event to occur before it can continue. In synchronous I/O, this is where a thread would block. Tasks provide an equivalent to this model: the task “blocks” by yielding back to its executor, after installing itself as a callback for the events it’s waiting on.

But is it the task's job to install itself as a callback for the events it’s waiting on? I believe it is future's job actually. And when does the installation happens? I believe it could be somewhere other than “polling begins”.

In tokio it might be right to say all this, but it may not apply in a more general case.


#9

Very nice read! I learned a thing or two from this thread. I think your last post would make a nice blogpost ( if you have one…). Here on the forum it will eventually be buried by other, newer threads.
We could really use more resources around futures, especially for beginners. This kind of “today I learned” is extremely useful to other newcomers! (Thanks for the write-up anyway, even if you don’t blog it :slight_smile: )


#10

But is it the task’s job to install itself as a callback for the events it’s waiting on? I believe it is future’s job actually. And when does the installation happens? I believe it could be somewhere other than “polling begins”.

You don’t want to register until at least the Future::poll() gets called the first time, but you could register some time later, perhaps after another event has occurred. (The word “begins” is blurry here: the “beginning” of a poll() call for one future could be the end of another future, since multiples futures can be stitched together.)


#11

Is there any reason for this? The one that I can think of is to avoid events happens before the executor runs, otherwise the future will miss some events. Is that right?


#12

The notion of a “task” only makes sense while a future is being polled. So if you try to register an event during the creation of a future, it will either fail because no task is running, or end up registering the wrong task (the task that is creating the future).


#13

Thank you for point this out! I haven’t thought too much about the “task” for now.

I started to wondering where does this restriction comes from, whether it comes from the design of futures-rs or the tokio's implementation? Below are my investigations:

A “task” is the unit of callback that executor can accept, so the registration of interest should be a pair of (resource, task). That means when we want to register the interest, we need to turn a future into a “task” to be used as executor, which means that the future will pass its ownership to the “task” and can no longer be used to compose “future chains”.

impl CustomFuture {
    pub fn new(handle: &Handle) -> ... {
        let resource = create_resource(...);
        let future = CustomFuture { ... }

        handle.register_interest(resource, future);
        // not going to work, future's ownership is given.
        future
    }
}

If the above assumption is true, I believe we can still bypass this:

Theoretically, we can create the future and register the interest right away before “the future is being polled”. Of course it requires the executor provide utilities to register interest directly. Like this:

let mut core = Core::new().unwrap();
let handle = core.handle();
let future = CustomFuture::new(...);
handle.register_interest(future.get_resource(), future);
// the interest is registered here, but the future is not yet polled.

Please point out if anything is wrong.


#14

A future is just a recipe. It should never try to do anything upon creation. It’s a static entity, much like a program. Programs can be combined to build larger programs, and likewise futures can be combined to build larger futures through the various combinators. They remain inert until you actually try to run it.

A task is like a thread – a “green” thread to be precise. It represents a thing that is currently happening, actively running. A task has an identity (e.g. a unique ID), whereas futures are completely anonymous – you can’t identify them even if you try; it’s just not part of the design.

Conceptually, a Future is nothing more than an ordinary function object (unboxed closure, to be precise) with the signature:

FnMut(Task) -> Poll<T, E>

That’s it.

If you squint a bit, it looks similar to the type signature of Future::poll, except poll doesn’t seem to ask for a Task object. Turns out poll is just being dishonest, because it uses a thread-local variable behind the scenes to store the current Task object. But that doesn’t change the fact that poll function necessitates a Task object available in that variable (futures-rs warns: “Additionally, calls to poll must always be made from within the context of a task. If a current task is not set then this method will likely panic.”).

The Task object is kind of a misnomer: it’s not the task itself; it’s only a handle to the task that allows you to retrieve some information about the task, so perhaps “TaskId” would have been a more appropriate name. The actual task itself is stored in a Spawn object, which is created simply by wrapping a future. The event loop’s main responsibility is to maintain a list of these Spawn objects (typically in a queue of some kind).

In order to register event notifications, you need to tell the event loop who you are − otherwise they won’t know which task to wake up! So by necessity, this requires the Task to be available. A lone, nameless Future simply won’t do.