How futures/tasks work with executors?

Thanks for all the reply. After digging into more source code, I think I understand now. Here I'd like to clarify what I've learned. Hope it could help others potentially.

Correct me if I am wrong.

Players

In order to understand the design of futures-rs and tokio-rs we need to notice 3 players:

  1. futures. The component to describe some operations to be completed. In order to implement a custom future, we need to implement method poll which is responsible for checking the resource it needs asynchronously.
  2. task. A task is a future that is being executed. We only need to know that it is a wrapper over futures that provides park() and unpark()(as @boxofrox point out, the new terminology is current() and notify()) to halt and wake up the thread that future is running on.
  3. executor. An event loop that monitors the resources futures are waiting for, and wake up the task that is waiting for the resource accordingly. It will call future's poll method eventually to drive the execution of a future.

Jobs of Players

First notice task is not the main character, so we'll ignore it.

There are two jobs to be done in order for the whole system to work:

  1. Register the resources(e.g. file descriptors) to be monitors to the event loop.
  2. Schedule(i.e. halt/wake up) the task according to different states(ready/not ready).

For registering resources, obviously the executor cannot know what the resources the futures need, so it is not possible for the executors to do the task. So it is reasonable for the futures to register the resources that needs to monitor.

We can infer that the future and executor should come together. That means if we have another executor that is not based on mio, the futures provided by tokio cannot be used with it without modification. Because the registration logic require the futures to know what events the executor support and how the registration are done.

For Scheduling tasks: wake up. Since it is the executor who got notification, it should be executor's work to wake up a task(maybe that's why unpark is renamed to notify?).

For Scheduling tasks: halt, technically, I would say there is no explicit method call to make to "halt a task". Once the future's poll method returned, the future gave away it control, thus consider "halted".

How are the jobs done?

Note that how to handle the above tasks are implementation specific. Here we focused on tokio's implementation which in my opinion make things harder to understand.

Resource registration

We already know that the event loop is created by executor, and the futures are responsible for register the interest for notifications. Let's start with the tokio's example:

    let mut core = Core::new().unwrap();
    let handle = core.handle();

    // Bind the server's socket
    let addr = "127.0.0.1:12345".parse().unwrap();
    let tcp = TcpListener::bind(&addr, &handle).unwrap();

Here Core::new() will create a new event loop behind the scene and core.handle() will return a handle to access the event loop later. For example to register the interests.

Notice that TcpListener will accept a handle in its arguments. If we check the source code:

  • the TcpListener::bind(...) will create a new PollEvented by calling PollEvented::new(..., handle)
  • It will in turn create a tokio_core::io_token::IoToken by calling IoToken::new(..., handle)
  • It will call handle.add_source(...) to register the socket created by TcpListener as interest in the Core executor's event loop.

tokio_core::net::TcpListener delegates to tokio_core::reactor::PollEvented to register a notification before returning an Async::NotReady.

What @boxofrox said here is partially wrong in the sense that the registration is actually not done in the poll method, but in the bind method. But the above quote is actually right because if we look closely at the implementation of add_source:

        try!(self.io.register(source,
                              mio::Token(TOKEN_START + entry.index() * 2),
                              mio::Ready::readable() |
                                mio::Ready::writable() |
                                platform::all(),
                              mio::PollOpt::edge()));

We found that it register for both readable() and writable() event! That means our future will listen to both read and write event. That cannot be right, and thus PollEvented will need to correctly "register" the interest in its poll_read and poll_write method.

It is the implementation detail that PollEvented will register the interest in every call of its poll... method. In some ideal world, the registration could be done once only.

Also note that handle.add_source(...) is provided by executor and it supports only the IO resource. There is no good interface in tokio to register custom events AFAIK.

Finally we got the conclusion that executor should provide interfaces for registering event. But it is not standardized. Thus executors and futures should come in pair, one future for one executor, it cannot be used for another executor directly.

Scheduling for tasks

The ideal case would be strait forward: the executor receive notification and "notify" the task/future to run. If the poll method return Async::NonReady, executor could go on to run other tasks (e.g. not explicit halt).

Here we need to consider two things: How could executor schedule the first run of the future?

Core::new() will fire an event when creation, so that at least the future run by Core::run(..) will be executed for at least once.

Also when registering tasks by handle.spawn(...), the executor will do similar things to fire a default event to ensure the first run.

In the ideal case, that's all that we need to know. Yet tokio does some special settings as we described previously that add_source(...) will register for both readable and writable by default.

As @boxofrox points out:

in your poll function you must register (with something) to be notified when your primitive becomes ready, and only then do you return Async::NotReady. Otherwise, your future is never "unparked".

That's because handle.add_source(...) by default will not register callbacks for read and write event. Thus the effect of calling PollEvented::poll_read or PollEvented::poll_write will actually send event to executor asking it to register the correct callback. So that later when the resource is ready, the future will be polled again.

Beyond IO

What I want to mention finally is that tokio is a special case. It has special configurations of its own as we described before. So it might be a bit misleading if we want to apply tokio's knowledge to general future/executor architecture. For example, in tutorial futures-model, it says:

However polling begins, if any of the interior futures produced a NotReady result, it can grind the whole task to a halt—the task may need to wait for some event to occur before it can continue. In synchronous I/O, this is where a thread would block. Tasks provide an equivalent to this model: the task “blocks” by yielding back to its executor, after installing itself as a callback for the events it’s waiting on.

But is it the task's job to install itself as a callback for the events it's waiting on? I believe it is future's job actually. And when does the installation happens? I believe it could be somewhere other than "polling begins".

In tokio it might be right to say all this, but it may not apply in a more general case.

2 Likes