Polling while Future execution

I try to make scenario when I have the Future with some work inside it like this:

pub struct Worker(pub bool);

impl Worker {
    pub fn start<F>(&mut self, work: F) where F: Fn() {
        work();
        self.0 = true;
    }
}

impl Future for Worker {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        print!("Poll: ");

        if self.0 {
            println!("Ready");
            Ok(Async::Ready(()))
        } else {
            println!("Notready");
            Ok(Async::NotReady)
        }
    }
}

It should do some work and periodically poll it ask self "Is work ready?" So I wrote this:

fn main() {

    let mut w = worker::Worker(false);

    w.start(|| {
        for _ in 0..1e6 as u64 {
            10f64.powf(10f64);
        }
    });

    w.wait();
}

But in stdout I don't see attempts of polling. Why doesn't it happen?

1 Like

Futures don’t run themselves - there needs to be an executor that drives them (ie polls them), such as tokio (but doesn’t have to be).

You also need to be careful with wait() - it blocks the current thread, but if the current thread is running an event loop, you’ll halt the entire execution.

@vitalyd, could you show how I can do it at the context of my example?

I don't think I fully understand what you'd like to do here. As you have it, start() does all the computation upfront, and then the first poll() would technically just return Async::Ready. What are you trying to do? What is the asynchronous action here? Perhaps you can describe your intentions a bit more.

@vitalyd, I try to explain more clearly... I need this scenario:
There is some long work and I want to do it asynchronously with display the result of polling.

Any reason you want to use futures for this? Note that manually polling a future in futures 0.2 is going to be trickier because poll takes a &mut Context parameter, and you won't really have one in hand unless you're building an executor as well.

What is the "main" thread going to do while that expensive work is happening? Also, is it expensive in terms of CPU or does it actually do I/O? Is the work serial or has parallelism?

A simpler approach might be to just use a background thread connected to the main thread with a channel, and communicate results over that channel. But you still haven't given enough concrete information :slight_smile:.

I choose futures = "0.1.21" in which Future still has poll(&mut self)

My reason is academic interest: I need to do asynchronous work and I want to see how poll() works when the result of work is not yet done.

Imagine work like something abstract, for example:

let job = || {
        for _ in 0..1e6 as u64 {
            10f64.powf(10f64);
        }
};

I can give you a general high-level explanation of this part.

First, there's nothing inherently special about poll() - you can drive the future yourself with an executor that does busy looping:

pub struct Worker(bool, u64);

impl Future for Worker {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        print!("Poll: ");
        let _ = 10f64.powf(10f64);
        self.1 -= 1;
        if self.1 == 0 {
            println!("Ready");
            Ok(Async::Ready(()))
        } else {
            println!("Notready");
            Ok(Async::NotReady)
        }
    }
}

fn execute_to_completion<F: Future>(mut f: F) -> Result<F::Item, F::Error> {
    loop {
        match f.poll() {
            Ok(Async::Ready(x)) => return Ok(x),
            Ok(Async::NotReady) => {} // keep looping
            Err(e) => return Err(e),
        }
    }
}

fn main() {
    let w = Worker(false, 10);
    execute_to_completion(w).unwrap();
}

This is of course wasteful because it's just a fancy inline loop. In practice, a future encapsulates some true asynchronous event: completion of I/O, execution of some work on a threadpool, etc. When an executor polls the future, the future may not be complete yet. In that case, a typical executor will not call poll on it again on its own - otherwise, it's doing the same busy looping we just did above. Instead, the future sets up a notification to be sent when whatever event it's waiting for is ready (e.g. some bytes are available on the socket, threadpool finished work, etc). This notification is associated with a concept called Task, and is the mechanism that an executor is using to run the future (or chain thereof). Once the task is notified, the executor will call poll() on the future again, and then the future can make forward progress. This way futures lie dormant in the executor, and are only re-polled when they're ready to do something.

Official tokio docs go into this execution model as well.

@vitalyd, I've got it. Do I understand correctly?:

  1. In the Future asynchronous work should be always inside poll().
  2. Future can make forward progress only when poll() is called.

Well, poll() is the executor's API to the future - it's how it's able to determine the status of the future (done, not done, errored). Asynchronous work is typically done, well, asynchronously :slight_smile:.

If a Future represents some work executing on a threadpool, for example, then that work can be progressing concurrently. Future only tells the executor whether it's done or not (via poll()), and it needs to have a way to determine that by setting up some communication with the threadpool.

If a Future is waiting on bytes to be available for reading on the socket, then that Future will have registered an event notification to be delivered by whatever OS kernel API when there's some data to be read. That work is also happening concurrently, and not even in userland in this case.

So it's better to think of poll() as just the API an executor has to ask the Future for its status.

I think I sort of answered this above as well. It's not that the Future's work makes progress per say, but rather it's how the executor can drive the future to completion by asking it for its status, and if it's not ready, keeping it stored somewhere until a task notification has been received, and the future is re-polled for its status. The most interesting case is when a future returns Ok(Async::NotReady) - to avoid busy polling it, as in the dummy executor I showed earlier, there's machinery for the future to return NotReady but to also register a notification such that when the asynchronous operation makes progress, the future is re-polled.

For a deep dive on how tokio interoperates with futures, take a look at Tokio internals: Understanding Rust's asynchronous I/O framework from the bottom up : Caffeinated Bitstream. It should make the connection between a future, a task (i.e. notification mechanism), and an executor (tokio event loop in this case) a bit more concrete.

1 Like

You wrote about executor. Could you show how to use it at the context of your example?

P.S.: I've read documentation about Executor, but I didn't find example of Future execution.
Also I try to understand this example with event loop.
In these examples I don't see how to run Future on Executor.

Here's an example using tokio 0.1:

pub struct Worker(bool, u64);

impl Future for Worker {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        print!("Poll: ");
        let _ = 10f64.powf(10f64);
        self.1 -= 1;
        if self.1 == 0 {
            println!("Ready");
            Ok(Async::Ready(()))
        } else {
            println!("Notready");
            futures::task::current().notify();
            Ok(Async::NotReady)
        }
    }
}

fn execute_to_completion<F: Future<Item = (), Error = ()> + Send + 'static>(f: F) {
    tokio::run(f); 
}

fn main() {
    let w = Worker(false, 10);
    execute_to_completion(w);
}

Note we need to add futures::task::current().notify() before returning NotReady because tokio's executor will not poll us automatically if we return NotReady (otherwise it would need to spin, which we don't want). notify() is a way, via the task associated with the future, to tell the tokio executor that it should, in fact, poll us again. This becomes a fancy form of a yield - the tokio executor may execute other tasks that are ready, but will come back to us.

A good small example of a custom Future using the Task mechanism might be the server in the sccache project.

There's a WaitUntilZero struct that's used to track when there are zero in-progress connections (I believe). It implements Future, and when polled and there are still in-progress connections, it returns NotReady but stashes the current Task. As active connections go away, it decrements a count and it reaches 0, it'll notify the Task associated with the future. This, in turn, will tell the executor (tokio in sccache's case) to re-poll WaitUntilZero.

@vitalyd, thank you for your tutorial!