Is this a valid async executor implementation?

As a follow on to @zireael9797’s question, I decided to look into what it takes to write an async executor; this is what I came up with. Does this look like a reasonable minimal implementation, or am I breaking some safety/other rule somewhere?

Also does anyone have some go-to basic async tests for this sort of thing?

mod blocking_future {

    use std::future::*;
    use std::task::*;
    use std::thread::*;

    unsafe fn rwclone(p: *const ()) -> RawWaker {
        make_raw_waker(&*(p as *mut Thread))
    }
    unsafe fn rwwake(p: *const ()) {
        Box::from_raw(p as *mut Thread).unpark();
    }
    unsafe fn rwwakebyref(p: *const ()) {
        (&mut *(p as *mut Thread)).unpark();
    }
    unsafe fn rwdrop(p: *const ()) {
        let _ = Box::from_raw(p as *mut Thread);
    }

    static VTABLE: RawWakerVTable = RawWakerVTable::new(rwclone, rwwake, rwwakebyref, rwdrop);

    fn make_raw_waker(th: &Thread) -> RawWaker {
        let ptr = Box::into_raw(Box::new(th.clone())) as *mut Thread as *const ();
        RawWaker::new(ptr, &VTABLE)
    }

    pub trait BlockingFuture: Future + Sized {
        fn block(self) -> <Self as Future>::Output {
            let mut boxed = Box::pin(self);
            let waker = unsafe { Waker::from_raw(make_raw_waker(&current())) };
            let mut ctx = Context::from_waker(&waker);
            loop {
                match boxed.as_mut().poll(&mut ctx) {
                    Poll::Ready(x) => {
                        return x;
                    }
                    Poll::Pending => {
                        park();
                    }
                }
            }
        }
    }

    impl<F:Future + Sized> BlockingFuture for F {}
}

use blocking_future::BlockingFuture;

#[test]
fn test() {
    let f1 = async { 3 };
    let f2 = async { 4 };
    assert_eq!(7, async { f1.await + f2.await }.block());
}

(Playground)

Output:


running 1 test
test test ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out


running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out


Errors:

   Compiling playground v0.0.1 (/playground)
warning: unused import: `blocking_future::BlockingFuture`
  --> src/lib.rs:48:5
   |
48 | use blocking_future::BlockingFuture;
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: `#[warn(unused_imports)]` on by default

warning: function is never used: `rwclone`
 --> src/lib.rs:7:15
  |
7 |     unsafe fn rwclone(p: *const ()) -> RawWaker {
  |               ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: function is never used: `rwwake`
  --> src/lib.rs:10:15
   |
10 |     unsafe fn rwwake(p: *const ()) {
   |               ^^^^^^

warning: function is never used: `rwwakebyref`
  --> src/lib.rs:13:15
   |
13 |     unsafe fn rwwakebyref(p: *const ()) {
   |               ^^^^^^^^^^^

warning: function is never used: `rwdrop`
  --> src/lib.rs:16:15
   |
16 |     unsafe fn rwdrop(p: *const ()) {
   |               ^^^^^^

warning: static item is never used: `VTABLE`
  --> src/lib.rs:20:5
   |
20 |     static VTABLE: RawWakerVTable = RawWakerVTable::new(rwclone, rwwake, rwwakebyref, rwdrop);
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

warning: function is never used: `make_raw_waker`
  --> src/lib.rs:22:8
   |
22 |     fn make_raw_waker(th: &Thread) -> RawWaker {
   |        ^^^^^^^^^^^^^^

warning: 7 warnings emitted

    Finished test [unoptimized + debuginfo] target(s) in 1.26s
     Running target/debug/deps/playground-1d70dea0e278014a
   Doc-tests playground

Yes, it appears correct. Note that you do not need to heap allocate the future:

fn block(mut self) -> <Self as Future>::Output {
    let mut boxed = unsafe {
        std::pin::Pin::new_unchecked(&mut self)
    };
    ...
}
1 Like

you should look at the implementation of futures::executor::block_on and associated issues and history. that one is pretty much the same as your idea. However some issues with lost wakeups due to race conditions had been discovered over the years. Your implementation likely contains the same issues.

1 Like

At a quick glance, the primary issue is that this code doesn’t have exclusive control of the thread’s token: if something else is also using park/unpark, the two token users can silently poison each other.

Looks like the most straightforward solution would be to ditch park/unpark. Instead, lock the Future in a Mutex and use a private Condvar for signaling.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.