Futures and Tasks

Edit: Hmm... The more I think about it the more I realize I probably have some very fundamental misunderstandings of what tasks are.

I'm working on a project that requires me to hash some files of arbitrary size, and I decided to try to do it using a custom file hasher future. See Looking for threaded/async tips for a specific problem (handing over writer socket)

In that thread @alice showed how easily one can make sure custom futures are executed (simply instantiate and call await in the context of an executor) -- but I want to be able to run more than one hash future in parallel. Thinking about this made me realize I'm not quite sure how these things work, and I think I may need some clarifications.

I use libevent quite a bit in C/C++, and while I realize async/futures aren't really a 1:1 equivalent to what libevent does, there are some important similarities, and I'd like to use what I know about libevent to see if I understand async/futures correctly. I'm hoping that this post, and any potential corrections to it, can be helpful to others coming from libevent/libev to rust.

With libevent one creates a base object, and within it one creates event objects. The event objects have callback functions that are registered to specific events, like "read" , "write" and "other". Once the OS detects an event the application has requested to handle it calls the appropriate event handler function.

The way I see it the async/futures framework in Rust is a generalization of what libevent does, but it isn't limited to the things libevent supports, and it's also not an isolated library like libevent is. But am I correct in assuming that:

An "executor" is more or less the dispatch loop in libevent.

A "future" (implementation) is a system which allows arbitrary events to be waited on. libevent is hard coded to support timers, kqueue, /dev/epoll, et al, i.e. it will always be limited to those events. A custom future can support those subsystems but can in addition do anything else (it could for instance be used to act on raw GPIO input in an embedded system).

A "future" (instantiation) is much like event contexts/handlers in libevent, but with the important difference that futures allows the language to create a state machine that allows waits to occur at any place in the async context while allowing the dispatcher thread to return to the main dispatch loop and process another future, and later resume the future where it was.

And this is where it gets a little confusing to me: What is a "task"? Are tasks what are actually "executed" by the executors? So let's say in my case that I want to be able to hash four files in parallel, does this mean that I have to instantiate four different futures and place them within four separate tasks and allow the executor to run?

In a sense, should one see "tasks" as the actual libevent event callback registrations?

I think much of my confusion could be cleared up if someone could tell me if this is accurate:

If a future A creates a new future B as soon as A is completed (within the same task), then we've created a sequential flow which transitions from A to B. But if A and B are created at the same time and placed within two different tasks, then the two will be run in parallel (well, in the order the executor decides to run them, but assuming both futures launch long-running threads).

And in my case, if I have multiple hashers running simultaneously -- how do I get their individual return values? The HasherFuture returns a struct that contains the file's hash, but if there are four of them running at the same time, how do I get the next one to complete without blocking the others? I.e. I don't want to wait for a specific one to complete, I want to wait until any of them finishes (the one that completes first), and then I want to extract its return value to I can send the result along to another process. Is this something that the particular executor one uses has to support?

I realized I really don't have a good grasp on how these things work when I realized that I can't simply do something akin to:

  let result = collection_of_futures.wait_for_any();
  println!("hash: {}", result.hash);

... because any future could be running, and there's no way to know how the type of the result variable.

Let's go through a bit of terminology:

Future: A future is an object that stores the state of some operation, along with a poll method that performs a bit more work and then returns. A future additionally has some mechanism for notifying a waker when the future is ready to resume work. When the future eventually completes, the poll method will return the completed value.

Executor: An executor is a structure containing a whole bunch of futures which are currently being executed. In order to actually execute a future, the executor creates a waker and puts it in a context and calls the poll method on the future with this context as an argument. The executor needs to have some mechanism to receive notifications from the futures when they are ready to continue, and the way this works is that the future keeps a copy of the waker it was given around, and some other piece of code later calls wake on that waker.

Async function: When a function is marked async, it is just syntax sugar for creating a new type that implements future, building a future implementation such that calling poll a bunch of times corresponds to the body of the function, and when the async function is called, it just immediately returns a new instance of this anonymous type.

Every future you await in an async function becomes wrapped in this new anonymous future type, and is stored in that type until it completes.

Future combinators: The futures crate provides an extension trait called FutureExt, which allows you to call various methods on variables of future type. One example is map, which wraps a future into a new future that first runs the wrapped future, and then runs the given closure. E.g. if you have a future that resolves to integers, but you want to double the result that the future resolves to:

my_future.map(|result| 2 * result)

Note that this is equivalent to:

async { 2 * my_future.await }

Functions like map are called combinators, because they allow you to combine futures into new futures. The important lesson here is that a future can be wrapped in a bigger future where computing the inner future is part of what the bigger future does. The source code for map is not too complicated, so lets take a look. I specifically want to point out this part:

impl<Fut, F, T> Future for Map<Fut, F>
    where Fut: Future,
          F: FnOnce(Fut::Output) -> T,
{
    type Output = T;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
        self.as_mut()
            .future()
            .poll(cx) // <-- poll is called on the inner function!
            .map(|output| {
                let f = self.f().take()
                    .expect("Map must not be polled after it returned `Poll::Ready`");
                f(output)
            })
    }
}

Notice that calling poll on Map will recursively call poll on whatever future you wrapped in it.

Task: Ultimately a future does nothing if it is never polled, but there are two ways a future ends up polled:

  1. You passed the future to the spawn function of the executor.
  2. The future is wrapped in another future, and this outer future polled the inner future as part of being polled.

When talking about a task, you are referring to a root future, i.e. a future that is polled in the first way. When talking about futures in the same task, we are referring to futures that are wrapped somewhere inside the same root future.

How to run multiple futures simultaneously: There are two approaches:

  1. Join the futures using a combinator.
  2. Spawn each future as a new task.

The futures crate provides a function called join, which takes two futures and wraps it in a new future that runs both simultaneously. The wrapped future will resolve to a tuple with both values when both futures complete. Note that both futures are still running inside the same task, the poll function of join just polls both futures, and they both end up waking the same task.

If you need to join more futures, there's also a join3, join4 and join5 function. Additionally there's a join_all which takes an iterator and FuturesUnordered that allows adding more incrementally.

Alternatively you can call the spawn function of your executor, which spawns a new task and typically returns a "handle". This handle is a future that is awoken when the new task finishes, and you can use it to wait for the result of the new task. You can also use various channels to send messages between tasks asynchronously.

Note that calling spawn makes you depend on the executor whose spawn function you called, while if you only use join, your library would be compatible with every executor.

4 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.