How to wait an async in non-async function?

Ah I see, so the only way for wait future to complete is poll it, right?

Previously I was curious on how Rust can have runtime owned async worker tasks. But now I understand, it's done by the thirdparty thread pool.

So I also realize there is a futures::future::Future, is this deprecating now?

1 Like

In fact, it is the only way for an asynchronous task to make progress at all in Rust's poll-based future model. You may find this classic blog post interesting if you want to study this model more.

The Future trait is staying with us as the standard interface to the opaque state machine types returned by async fns. Much like the Fn/FnMut/FnOnce traits allow us to manipulate opaque closure types.

However, this trait is migrating into the standard library as std::future::Future, because with async fn, the language now needs to have built-in support for futures. futures::future::Future will likely remain around for backwards compatibility, but become a mere reexport of std::future::Future.

I think the Future trait has received slight API changes during the standardization process, but the core ideas remain the same.

3 Likes

That blog post is really helpful !

I see, a Future is more like a continuation, rather than promise in JavaScript. And basically we can do small step evaluation on the Future continuations. By doing so, if a single thread handles multiple Future s, it's actually having multiple "green threads".

My final question is, is there any documentation for how to poll the future or select multiple futures with only standard library ? I suppose there should be some way to do this.

2 Likes

Given that futures are a very recent addition to the standard library which hasn't even reached stable (it's targeted for rust 1.36), I'm not sure if there is an authoritative guide on how to manipulate std futures "by hand" yet. Especially as the general expectation is that most people will not want to dive into the internals of futures, but will just feed them to a compatible runtime without an extra thought.

However, the design of std Futures is quite close to that of v0.3 of the futures-preview crate. So you may be able get a good picture of how it all works by studying the futures-preview crate's documentation, the source code of futures-preview and higher-level infrastructure like romio and juliex, and the huge body of community discussion that went on as std::future::Future was being stabilized.

3 Likes

Ah, find an good example in futures-preview crate. https://github.com/rust-lang-nursery/futures-rs/blob/4980af0a236fde4c2c8609987c948f7bd4b51bed/futures-executor/src/local_pool.rs#L65

Thank you so much!

1 Like
fn run_future<T:std::future::Future>(mut task: T) -> T::Output {
    let thread_data = (thread::current(), RawWakerVTable::new(
        clone_waker,
        wake_waker,
        wake_waker,
        drop_waker,
    ));

    let waker = unsafe {
        Waker::from_raw(
          clone_waker(&thread_data as *const _ as * const ())
       )
    };

    let mut ctx = Context::from_waker(&waker);

    return loop {
        println!("event loop iter!");
        match std::future::Future::poll(unsafe{ 
               std::pin::Pin::new_unchecked(&mut task) 
        }, &mut ctx) {
            Pending => { thread::park(); }
            Ready(val) => { break val; }
        }
    };
}

async fn get_value() -> i32 {
    // How can I make task yield the thread at this point ?
    return 1;
}

async fn foo() -> i32 {
    let ret = get_value().await + get_value().await;
    return ret;
}

fn main() {
    println!("{}", run_future(foo()));

}

More question, I tried to implement a tiny 1:1 task loop run future.
But I realize the .await keyword doesn't trigger the task yield the control of the thread.

So the output is

event loop iter!
2

My guess is there should be some thing else in the API that can makes an async function yield the control.

So I am wondering if there's such thing can make the function yield. For example, if we are implementing an async sleep, then my guess is we should add the context to a event queue and then what's the next step?

1 Like

A future yields control by scheduling itself to be woken up later on and returning Pending from poll(). The "scheduling" part is done using the Waker argument that the Future::poll() implementation receives as a parameter. This waker should be cloned and handed over to whatever part of the codebase is in charge of waking up the Future later on.

Unfortunately, there is currently no syntax for retrieving the Waker or returning Pending from poll() when implementing a Future using the async fn syntax. So you'll need to implement the Future trait manually if you want to explore this part of the design further.

I believe the rationale is that asynchronous IO implementations are still meant to implement futures manually, and async/await is only intended as a convenience for end-users of these libraries who need to compose Futures with each other.

Thanks a lot! You already answered my question.

I can see the reason why higher level code won't need the yield semantics, but I am curious about how the IO is handled under the async IO API layer. And I agree yielding on every await is way too expensive.

Just as you said, what I need to do is implement a Yield type that returns Pending first and then Ready. That's good enough to me, thank you again!

enum YieldFuture {
    ToYield,
    Yielded,
}

impl std::future::Future for YieldFuture {
    type Output = ();
    fn poll(mut self: std::pin::Pin<&mut Self>, ctx: &mut Context) -> std::task::Poll<()> {
        match *self {
            YieldFuture::ToYield => {
                *self = YieldFuture::Yielded;
                return Pending;
            },
            YieldFuture::Yielded => {
                return Ready(());
            }
        }
    }
}

async fn get_value() -> i32 {
    YieldFuture::ToYield.await;
    return 1;
}
2 Likes

This won't work generally. For a future to be woken up, it has to tell the waker to wake it up when it's ready. So you must register with whatever task you're waiting for to call Waker::wake at some point in the future (which I assume for this minimal reactor is a thread::unpark).

Yes, with my previous main loop, it hangs forever. In fact, in the dispatching loop, I don't need thread::park.
If there's only thread, if the thread is sleeping, there's no way to wake it again.

So I think if it's a single threaded future runner, then there won't be any parking. And my guess is for the IO purpose, we should have epoll called in the loop when Pending is returned, rather than simply parking.

See futures::executor::block_on, which does exactly this (runs a future on the thread in a blocking manner):

pub fn block_on<F: Future>(f: F) -> F::Output {
    pin_mut!(f);
    run_executor(|cx| f.as_mut().poll(cx))
}

fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
    let _enter = enter()
        .expect("cannot execute `LocalPool` executor from within \
                 another executor");

    CURRENT_THREAD_NOTIFY.with(|thread_notify| {
        let waker = waker_ref(thread_notify);
        let mut cx = Context::from_waker(&waker);
        loop {
            if let Poll::Ready(t) = f(&mut cx) {
                return t;
            }
            thread::park();
        }
    })
}

It's pretty much expected that if you're doing basically anything with futures other than writing an async fn that serially awaits other async fn/fn -> impl Future you'll need at least futures_core and probably futures_util. But these aren't (intended to be) public dependencies, so they can stay out of std for now and iterate outside std's stability guarantees.

I do, however, expect a decent amount of futures to graduate at some point to at least a std-like situation where they're shipped with the standard library. For now in 0.3-alpha-land, however, it's still being figured out.

6 Likes

But my question is, if there's only one thread for the program. If the thread is put into parking, then it seems there's no way to wake it anymore. Right ?

So I just get a little confused what if the run_executor just blocked because the poll function returns a Pending state. It looks like we should have another thread call unpark on it, otherwise the future will sleep forever.

1 Like

It seems to me that the thread::park() strategy only works if there is a separate IO thread handing out notifications. Without that you must replace thread::park with something that directly waits for IO events (maybe epoll or similar).

The impl Future::poll must register the wakup call. If it doesn't, the future will never be polled again. If Waker::wake isn't called, your Future will not wake back up. (Though a spinloop poll is a valid implementation as well. The "won't be called" is more a "MAY not be called", as spurious wake-ups are allowed.)

If your OS only provides blocking IO, you need to have a worker thread spun up to do the work and then call wake. For epoll, I believe the ideal situation is to have a single epolling thread that blocks for an epoll event then wakes the correct Waker based on a synchronized epoll event => Waker mapping.

(Disclaimer: I do not know how any low level asynchronicity actually works.)

This contract of Future-registers-its-own-wake is (part of) why Rust futures don't work to implement semicoroutines (generators). (Semicoroutines are enough to implement cooperative asynchronicity.)

1 Like

But if this is the case, it sounds like Future is only desgined for async IO. Rather than staff like lightweight context swith or N:M concurrency? For example what if I need to run two tasks with a single thread to avoid expensive OS context switch?

1 Like

I'm exploring this very use case at the moment, and yes, Futures are not the tool you want there, because they cannot yield control without falling asleep (so to speak).

Generators (which remain unstable for now) would be more appropriate for this purpose.

1 Like

Ah, I see. So you are using Generator based rather than Future based approach ?

1 Like

Right now, I'm going for a library-based approach that works on stable, so I have a trait with an execute() method that returns an enum with variants for completion, non-blocking yield, and blocking yield (and maybe errors too in the future, still need to figure out if I need it or if it can be bundled with another outcome).

In a way, that's similar to the Future trait, but not quite it because 1/I do not return results at the end and 2/I can yield without blocking.

But this library-based approach does not allow borrowing across yield points, which leads to the well-known usability problems of manual futures without async fn (need to put everything in Rc/Arc, etc.). So if I switched to nightly, I would use generators instead, which IIRC address this problem now.

My general point of view is that working with nightly is a bit of a pain because of the periodic breakages, so I avoid doing so at this early research stage. Hopefully, by the time where I'll really want generators, they will finally have landed on stable.

In theory, you can implement games-style coroutines (cooperative multi-frame yielding multitasking) with just Future.

To translate Unity's coroutine yields:

yield return null => next_update().await
yield return new WaitForFixedUpdate() => next_fixed_update().await
yield return new WaitForEndOfFrame() => end_of_frame().await
yield return new WaitForSeconds(..) => seconds(..).await
yield return new WaitForSecondsRealtime(..) => seconds_realtime(..).await
yield return coroutine_handle => coroutine_handle.await
yield return new WaitUntil(() => ..) => wait_until(|| ..).await
yield return new WaitWhile(() => ..) => wait_while(|| ..).await

The described futures here would find the main loop global and register their callback in a priority queue. (In Unity, the YieldInstruction is yielded from the generator to the main loop which does that bookwork.)

The main loop would then do main loop things, and check the task priority queue at the right times to advance the waiting and wake/poll those who's time it is to wake up.

Probably the minimal "proper" setup would have an (optionally synchronized) task thread (pool) for running the tasks and a main thread in charge of the main loop, but the main loop can also drive the futures directly if you don't care about interopability of the "wait for game event" futures with other reactors. (Actually, you could add a small layer of indirection through Wake::wake to make it work anyway: the priority queue is just for advancing the waits and waking the tasks, Wake::wake sets a flag that the coroutine needs to be polled, and polling the futures is a separate step from waking them. Full reactor agnosticism!)

So in TL;DR: your reactor controls the waker. Make a waker that sets a "this future needs to be polled" flag and alternate between polling futures while they need to be polled and making progress on awaited tasks.

4 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.