Why does pollster use synchronization primitives?

This is the implementation of pollster, which is a simple async executor that blocks the current thread until a future is resolved:

The Signal struct is implemented using mutexes and conditional variables. This confused me because pollster runs all computations on a single thread. Why does it need to use these synchronization primitives?

I found another implementation of a simple, single thread async executor in Waker trait docs:

struct ThreadWaker(Thread);

impl Wake for ThreadWaker {
    fn wake(self: Arc<Self>) {
        self.0.unpark();
    }
}

/// Run a future to completion on the current thread.
fn block_on<T>(fut: impl Future<Output = T>) -> T {
    // Pin the future so it can be polled.
    let mut fut = pin!(fut);

    // Create a new context to be passed to the future.
    let t = thread::current();
    let waker = Arc::new(ThreadWaker(t)).into();
    let mut cx = Context::from_waker(&waker);

    // Run the future to completion.
    loop {
        match fut.as_mut().poll(&mut cx) {
            Poll::Ready(res) => return res,
            Poll::Pending => thread::park(),
        }
    }
}

block_on(async {
    println!("Hi from inside a future!");
});

It doesn't use any synchronization primitives and appears to work just as well as pollster.

I want to know why pollster uses these synchronization primitives?

cc @zesterer

I think it doesn't need them and they are just an implementation detail (to create a Waker, you need an Arc containing something that is Send and Sync). There is a PR for a lock-free version of pollster that does not require synchronization primitives and looks more similar to futures' LocalPool's ThreadNotify version of a waker signal.

Both implementations basically put thread that is polling Future to sleep when it returns Poll::Pending, and then wake it up, when Waker::wake is called. One uses manual thread::park (and presumably thread::unpark somewhere), and the other does the same, but using Mutex and Condvar.

1 Like
  1. Thread parking is a synchronization primitive.
  2. What pollster does is the cheaper way to do it in the single threaded case, since it doesn't park the thread when the future wakes itself from within poll.
6 Likes

Even though pollster doesn't use additional threads, the Futures themselves may do so. For this reason Waker is Send, and implementations need to ensure it can be used from different threads, which requires some synchronization primitives (thread parking included)

4 Likes

Read the disclaimer in that page:

Note: This example trades correctness for simplicity. In order to prevent deadlocks, production-grade implementations will also need to handle intermediate calls to thread::unpark as well as nested invocations.

Here is a future which has an “intermediate call to thread::unpark” that breaks it:

block_on({
    let mut first = true;
    futures::future::poll_fn(move |ctx| {
        if first {
            first = false;
            ctx.waker().wake_by_ref(); // imagine this was run on another thread
            thread::current().unpark(); // ensure token is present
            thread::park(); // will always unpark immediately
            // now we have no token and will fail to progress
            Poll::Pending
        } else {
            println!("Hi from inside a future!");
            Poll::Ready(())
        }
    })
})

This future will execute fine in tokio or pollster, but hang the park-only executor. The fundamental problem here is that the only waking signal used is the parking token, but that token can be stolen by any code that runs on the same thread — it's a global resource. In general (not just in implementing block_on()), I believe it’s only correct to use unpark() to wake a thread when you know the thread is currently running code you wrote, because if code you didn’t write is running, it can steal the token and thus prevent the expected wakeup.

6 Likes

Actually, “currently running code you wrote” is too strong. You can use parking regardless of what the thread is doing, as long as you always check your actual wakeup condition before parking, with no rogue unpark() between those two points. The problem with this executor is that it has no way to check that condition; it can only park and hope it gets an unpark, but the parking state is a shared resource of sorts, so it can’t correctly rely on that.

This reminds me of the problem of checking for wakeup in any future that is expecting a message, like demonstrated in AtomicWaker’s documentation: you have to check for the wakeup condition, register a waker, and then check the condition again, because otherwise the condition might have become true in the time window between the first check and registering the waker. The common element here is that your code has to make sure it doesn’t forget to wake up due to outside events happening concurrently in an inconvenient order rather than a convenient one.

4 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.