How are async `JoinHandle`s implemented?

I've been educating myself about Rust's implementation of async/await lately, and my understanding is that Futures can schedule themselves to be polled at a later moment by calling wake() on the Waker, which will cause the executor to call poll again.

Assuming that's correct, I'm not exactly sure what the executor would do with a Poll::Ready<T> when the T is anything other than ().

Indeed, the futures's RFC mentions that:

executors provide the ability to create tasks from ()-producing Futures

However, many runtimes provide a free-standing function to spawn tasks whose signature, modulo any Send, Sync and 'statics, is something like

pub fn spawn<F: Future>(future: F) -> JoinHandle<F::Output> {}

where JoinHandle<T> is itself a future that resolves to T.

I've been thinking about how this could be implemented under the assumption that the executor can only spawn ()-producing Futures.

In pseudo-code, assuming there's a global EXECUTOR, this could be:

pub fn spawn<F: Future>(future: F) -> JoinHandle<F::Output> {
    let (tx, rx) = oneshot::channel();
    let future = async move {
        let out = future.await;
        let _ = tx.send(out);
    };
    EXECUTOR.spawn(future);
    JoinHandle::new(rx)
}

pub struct JoinHandle<T> {
    rx: oneshot::Receiver<T>
}

impl<T> Future for JoinHandle<T> {
    type Output = T;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.rx.try_recv() {
            Ok(value) => Poll::Ready(value),
            Err(Error::Empty) => Poll::Pending,
            Err(Error::Closed) => unreachable!(),
        }
    }
}

But I doubt tokio, async-std or smol are actually using channels to do this.

Am I completely off? What would be the proper way to do this?

They can probably make it more efficient by using a single memory allocation for the task's future and the channel's value storage, but other than that, why shouldn't they do it the way you show?

2 Likes

You can take a look at how Tokio implements JoinHandle, but beware that this is a very non-trivial implementation.

In drastically simplified terms (noting that Tokio uses unsafe and careful coding instead of Mutex and Arc), a Tokio task looks a little bit like:

use std::sync::{Arc, Mutex, Weak};
use std::task::Waker;

struct SimplifiedTask<ReturnType> {
    join_waker: Mutex<Option<Waker>>,
    result: Arc<Mutex<Option<ReturnType>>>,
}

struct JoinHandle<ReturnType> {
    task: Weak<SimplifiedTask<ReturnType>>,
    result: Arc<Mutex<Option<ReturnType>>>,    
}

When the task naturally completes, it does an equivalent of the following:

impl<ReturnType> SimplifiedTask<ReturnType> {
    fn exit_task(self, result: ReturnType) {
        *(self.result.lock().expect("poison")) = Some(result);
         if let Some(waker) = self.waker.into_inner().expect("poison") {
             waker.wake()
        }
    }
}

This first saves the result of the task, and then wakes up anything that's said it wants to be woken when this task finished.

Then, JoinHandle's impl Future does the equivalent of the following code:

impl<ReturnType> Future for JoinHandle<ReturnType> {
    type Output = ReturnType;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.task.upgrade() {
           Some(task) => {
               *task.join_waker.lock() = Some(cx.waker().clone());
               Poll::Pending
           }
           None => Poll::Ready(
              Arc::into_inner(self.result)
                  .expect("Task holds only other copy")
                  .into_inner().expect("poison")
                  .expect("Task exit sets result")
           ),
        }
    }
}

Please note that this code is very different to Tokio's code - it does the same thing as Tokio using only Safe Rust, but is a lot less efficient as a result.

1 Like