Question about Waker / Context in Futures

Hello,
i am trying to wrap my head around how the futures work (for educational purposes) and run into some issues. I rarely see the Details of the Waker part explained. How does the waker work exactly. Often times I find posts about futures but skipping the waking part. For example I had a very thorough look at this async Book for Rust, but it still leaves some questions. Basically my question boils down to:

How exactly can a Waker wake a Future once it is in Poll::Pending. A very simple and inefficient solution would obviously be an infinite loop - but that is of course not efficient. Otherwise how could a Future tell once it is ready to be polled again. Assume the future waits for a user input or slow database query, this might take 5 or 20 sec. How does the future knows to be polled again, after the query has been resolved? The future is put 'on hold' so how can it interact with the executor if not polled? Maybe I am missing some part but even with a callback, the Future still would need to run / be active to resolve a callback. But from my understanding it is not active / running in a different thread until the future is polled again.

So I would be grateful if someone could shine some light onto this area of how the Waker works exactly. Many thanks.

Hi there,

may be you could follow this implementation of a simplified async runtime to better understand the inner mechanics :wink:

Demystifying Async/Await in Rust

The Waker is the means by which the future notifies the executor that it should be polled. It's an opaque callback into the executor's implementation, provided by the executor through the Context.

The means by which the Waker communicates this is up to the executor's implementation.

If you use an async runtime like Tokio, futures are added into queue. Roughly Tokio fetches any future in this queue and call poll method on it. Now when waker.wake() is called (by whatever means - Epoll or from some background thread) inside it we push the future to this queue. A Mini Tokio version of this model is worked out below: Async in depth | Tokio - An asynchronous Rust runtime. In this link checkout Updating Mini Tokio section, impl MiniTokio code block run method.

1 Like

Thanks everyone.

@2ndTaleStudio Thanks for the Book. I'd read it before. I think it was very helpful for understanding async better, but still have the same question around the waking part. (I also tried to run it but get: "unknown feature llvm_asm").

And regarding context, is it correct that you do not have to implement anything and it's just a struct really where you use the method from waker()?

And regarding wake, maybe I can use your example from chapter 3.4. I understand this is the definition for wake_by_ref but I am not quite sure how it works. It wakes a future up but how? Once a message is send? You pass the Thought into_waker(&thought) and register it with the context. That part is clear. But what happens then. Assuming one thought process takes a long time, who would the future know it needs to poll it again?

impl Wakeable for Thought {
    fn wake_by_ref(self: &Arc<Self>) {
        let clone = Arc::clone(self);

        self.sender.send(clone);
    }
}

There are any number of ways that wake could work, depending on how the executor is written. One option would be to have an mpsc::channel that tells the executor which wakers have fired:

(NB: Just a sketch, lots of details missing)

struct Waker {
    task_id: usize,
    sender: mpsc::Sender<usize>
}

impl Waker {
    fn wake_by_ref(&self) {
        self.sender.send(self.task_id);
    }
}

struct Executor {
    tasks: HashMap<usize, Pin<Box<dyn Future>>>,
    recv: mpsc::Receiver<usize>,
    send: mpsc::Sender<usize>
}

impl Executor {
    fn run(&mut self) {
        for task_id in self.recv.iter() {
            let waker = Waker { task_id, sender: self.sender.clone() };
            if let Poll::Ready(_) = self.tasks[task_id].poll(waker) {
                self.tasks.remove(task_id);
            }
        }
    }
}

Somewhere there needs to be code that knows when the future can proceed again. The job of the Future implementation is to get a copy of the waker to that code so that it can call the wake() method at an appropriate time. The async machinery will move that waker between different futures as necessary. At the end of the line, though, there needs to be a hand-written Future that knows how to call wake() in response to some event (timer expiration, network data, etc).

Before future::poll returns Pending it sets the waker up to be called by the epoll code for example (so there is a timer future that registers wtih the timing code, etc..). There is one waker per task so the whole task gets woken up (i have no idea why).

I have similar questions and went through the books too and looking without success to find the exact transformation that await does.

Conceptually, each async {} block or function is turned into an enum with one variant per await point; each variant stores all of the local variables that are alive at the corresponding await. The generated poll method dispatches to the poll method of the future being awaited and inspects the result. If the contained future is still Pending, the outer future also returns Pending. If it's Ready(_), however, the async code is run until it reaches another await. When the async block eventually returns a value, the outer future will then return Ready itself.

The actual transformation can't really be written out, as it would involve things like unrolling loops. Under the hood, the compiler is working mostly as normal except that the stack frame is stored in an object instead of actually on the stack; this lets execution pause and resume without disrupting the async code's internal state.

You'll notice that nowhere here does wake() get called— The system responsible for actually waiting for and noticing events can't implement its futures via an async block; they have to be hand-written. The async transformation is only there to make the logic that uses these low-level futures easier to write.

So rust went to a segmented stack for the futures? I thought it kept everything on the OS stack.

I understand how it generally works, but some of the questions I have will only be answered by knowing the exact transformation (I dont understand why unrolling loops matters).

For example you say that upon calling a an async block, I enter the genereted future's method adn run until I return ready, pending, or hit an await, do I get descheduled? I thogutht that only happened when I returned pending I see alot of spruious awaits and async markers in code (eg, a function that chains to other version with more arguments), do all of those need async on them and return await? That would seem to be a huge performacen hit if any of those awaits had any effect on the execution.

Telling a low-level system developer that he can't be exaplained the transformation because it is too complex doen't really encourage me to use it. The best system language are clear even if complex in what they are doing. I dont' think async is very clear.

So rust went to a segmented stack for the futures? I thought it kept everything on the OS stack.

It's a mix. Every local variable (and piece of state like which branch of an if was taken) that exists on both sides of an .await point is stored in the future, so that it can be remembered when the future is polled again. Everything which only exists between await points, and therefore inside of a single call to poll(), is stored in the regular call stack, because it doesn't ever need to be part of the future.

This distinction also affects the visible properties of the future, not just its implementation — for example, if a future is required to be Send, then it can work with !Send values as long as it doesn't hold them across an await point, since those correspond directly to situations where that !Send value might end up carried along with the future to another thread, which would be incorrect. Example:

async {
    // This variable is used on both sides of the await point below,
    // so it is stored in the future.
    let mut weird_counter: i32 = 0;

    while let Some(item) = some_queue.pop().await {

        // this value is !Send, but never crosses an await point, so it
        // is only stored on the call stack, like a regular function's
        // local variable
        let rng = rand::thread_rng();

        weird_counter += item * rng.gen_range(0..1);
    }
    weird_counter
}

I enter the genereted future's method adn run until I return ready, pending, or hit an await, do I get descheduled? I thogutht that only happened when I returned pending

No. When an async block reaches an await, it polls the awaited future. If that future returns Poll::Ready, then the async block continues execution and does not itself return. The outer future only suspends itself, returning Poll::Pending, if the inner future did.

2 Likes

Hi,

I'm sorry for this. I did not updated the source code for a while. It was written while Rust transitioned the asm! macro to a new version and introduced the llvm_asm! that was available behind a feature flag. However, the new asm! is now stabilised and thus I need to update my example code...

Regarding the other questions you had I guess they are already answered to a great extend.

Just let me emphasise once more on the waking part:
At the very end there will be a "hand-written" Future implementation that will decide in its's poll() implementation what to do with the Waker that has been passed into in case the Future will return Pending. As an example this could mean that the Waker is passed to an interrupt service routine. As soon as the interrupt gets fired the wake or wake_by_ref function is called on the Waker that has been passed to it. The awaken of the Future will add it to a kind of processing queue within the async executor. The executor will pick the Future and call poll() again. And the cycle repeats until the Future returns Poll::Ready.

If in your example the Future does some long time computation the following approach may be useful:

The first call to poll will spawn a new thread that does the heavy computation. The Waker of the Future will be passed into this thread and the poll function will return Poll::Pending.
Once the thread finishes the heavy computation it will use the Waker stored in it to wake the Future.
The Executor will again poll this Future. Now the result of the computation of the thread is available in some shared state and could be picked up and the Future will return Poll::Ready.

Hope this further sheds a light on the async complexity.

1 Like

The problem isn’t so much that the transformation is complex, but rather that it operates at a lower level than Rust syntax— There’s no directly-corresponding Rust code to the output of the transformation. To see the actual transformation output, you’ll need to look at something like MIR, which I’m not familiar enough with to comment on.

NB: I’m not an expert in any of this, so some statements here may be incorrect.

If I'm understanding this properly:

The only way you give up the thread is by calling await and then returning pending? In which case the pending call percolates up the stack and pops out in the rt code. So there is nothing going on besides just a bunch of returns (and this is why the partially segmented stack for things that need to live through an await -- this is pretty trivial to understand but the various descriptions make it into a much more complex thing).

So the only real magic is in the await transformation that I'm still trying to find documented or written about somewhere.

Thanks. Its been enlightening, definitely.

I think you've got it right, but to be a little more precise:

  • A Future implementation gives up control by returning Poll::Pending from its poll() method.
  • The Future implementation that is generated for an async block (or async fn) will do that when, and only when, execution reaches some a_future.await operation where the nested future a_future returns Poll::Pending from its poll() method. It will not do that in any other circumstances.
2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.