Simplest possible block_on?

Hi! As I'm learning about async Rust, I read Build your own block_on() by @stjepang, and Applied: Build an Executor from "Asynchronous Programming in Rust." There's quite a bit of nuance in even these didactic implementations. What I'd like to know is: what is the simplest possible implementation of block_on that will still drive any correct future to completion?

To make this easier, I'm thinking to add a few restrictions:

  • The implementation does not need to care at all about performance.
  • The implementation only needs to work in a single-threaded environment.
  • The implementation can assume it's the only executor on the system, if that helps.
  • The implementation doesn't need to support any kind of "spawn" functionality.

In this case I'm really only interested in a solution that's technically correct. The original motivation for this has been struggling to find a correct executor that will run on an embedded system.

Here’s my attempt from a few months ago. It should be correct as long as nothing else attempts to park or unpark the main thread.


Edit: I just cleaned this up to be more correct, simpler, and significantly less performant by ignoring the Waker and spin-waiting instead:

Playground

mod blocking_future {

    use std::future::*;
    use std::task::*;

    const PENDING_SLEEP_MS:u64 = 10;

    unsafe fn rwclone(_p: *const ()) -> RawWaker {
        make_raw_waker()
    }
    unsafe fn rwwake(_p: *const ()) {}
    unsafe fn rwwakebyref(_p: *const ()) {}
    unsafe fn rwdrop(_p: *const ()) {}

    static VTABLE: RawWakerVTable = RawWakerVTable::new(rwclone, rwwake, rwwakebyref, rwdrop);

    fn make_raw_waker() -> RawWaker {
        static DATA: () = ();
        RawWaker::new(&DATA, &VTABLE)
    }

    pub trait BlockingFuture: Future + Sized {
        fn block(self) -> <Self as Future>::Output {
            let mut boxed = Box::pin(self);
            let waker = unsafe { Waker::from_raw(make_raw_waker()) };
            let mut ctx = Context::from_waker(&waker);
            loop {
                match boxed.as_mut().poll(&mut ctx) {
                    Poll::Ready(x) => {
                        return x;
                    }
                    Poll::Pending => {
                        std::thread::sleep(std::time::Duration::from_millis(PENDING_SLEEP_MS))
                    }
                }
            }
        }
    }

    impl<F:Future + Sized> BlockingFuture for F {}
}
1 Like

Awesome, thanks for posting this! I ended up coming up with basically the same solution, except I used this crate cooked-waker because I was scared of the unsafe code with RawWaker. My solution looks like this:

use std::task::{Context, Poll};
use std::future::Future;
use cooked_waker::{IntoWaker as _, WakeRef};

struct NoopWaker;

impl WakeRef for NoopWaker {
    fn wake_by_ref(&self) {
        // It's okay to do nothing only because block_on continually polls
    }
}

fn block_on<F: Future>(future: F) -> F::Output {
    pin_utils::pin_mut!(future);
    loop {
        if let Poll::Ready(output) = future.as_mut().poll(&mut Context::from_waker(&NoopWaker.into_waker())) {
            return output
        }
    }
}

N.B. for anyone who stumbles onto this thread: while it is simple, making the waker a no-op is not likely what you want, because continually polling a future is exactly what Rust's async ecosystem is designed to avoid. Not only does this require 100% CPU utilization while doing no work, but I think it could also cause a future to build up a really huge set of wakers, which could be trouble for memory consumption.

3 Likes

According to the spec, Futures are only supposed to notify the waker from their mos recent poll call. There’s no actual enforcement for that, though, and I don’t know how many real-world implmentations follow that part of the spec.

Oh, thanks for pointing that out! I did not know that. Also, I realized that we're always giving it the same waker, so as long as it's storing the waker in a set-type data structure I think it should be okay.

1 Like

This version seems to resolve some of my run-away issues related to
using "futures::executor::block_on" in a tokio::runtime...
Old version:

pub fn await_future<F: Future>(f: F) -> F::Output {
    futures::executor::block_on( async { f.await})
}

New:

mod blocking_future {

// https://users.rust-lang.org/t/simplest-possible-block-on/48364/2
    // use cooked_waker::{IntoWaker as _, WakeRef};
    use std::future::*;
    use std::task::*;

    unsafe fn rwclone(_p: *const ()) -> RawWaker {
        make_raw_waker()
    }
    unsafe fn rwwake(_p: *const ()) {}
    unsafe fn rwwakebyref(_p: *const ()) {}
    unsafe fn rwdrop(_p: *const ()) {}

    static VTABLE: RawWakerVTable = RawWakerVTable::new(rwclone, rwwake, rwwakebyref, rwdrop);

    fn make_raw_waker() -> RawWaker {
        static DATA: () = ();
        RawWaker::new(&DATA, &VTABLE)
    }
    
    pub trait BlockingFuture: Future + Sized {
        fn await_future(self, timeout: std::time::Duration) 
            -> Result<<Self as Future>::Output, String>  
        {
            let mut boxed = Box::pin(self);
            let waker = unsafe { Waker::from_raw(make_raw_waker()) };
            let endtimes = tokio::time::Instant::now() + timeout;
            let mut ctx = Context::from_waker(&waker);
            loop {
                match boxed.as_mut().poll(&mut ctx) {
                    Poll::Ready(x) => {
                        return Ok(x);
                    }
                    Poll::Pending => {
                        if endtimes < tokio::time::Instant::now() {
                            super::error!("Tokio::await Timeout: The end is neigh... ");
                            return Err("Timeout".to_string());
                        }
                        let _ = tokio::task::yield_now();
                    }
                }
            }
        }
    }

    impl<F:Future + Sized> BlockingFuture for F {}
}
pub fn await_future<F: Future>(f: F, failure: F::Output) -> F::Output {
    let fut = 
        blocking_future::BlockingFuture::await_future(  async {f.await}, 
        std::time::Duration::from_secs(30));
    match fut
    {
        Ok(val) => 
            val,
        Err(_err) => {
            failure.into()
        }
    }
}

Further improvement would be :

  • take the timeout as a closure that returns the Success/Failure the same as
    the Future... that can terminate the wait
  • switch to CookedWaker

...

@janrune So wait, you just add a timeout to the block_on future? That seems like a real bad idea. There's a reason that Tokio makes its block_on panic when used inside a runtime.

@alice...

Well...
Unfortunately I have not found another option...
I agree, it's not a good option, but without a good example of how to do it "correctly"...
Please point me to it, if you know of one...

I've been digging for a solution to the issue of going from non-async code to async within tokio.
I would love to see an example of how to do it "correctly".

I was using futures::block_on, but found a posting where the you and others were stating
that using the futures::block_on was a really bad idea.

So, when I had a runaway process I had to look for the alternate correct version.

Optimally, there would be a method/macro:
tokio::await_async<F: Future>(future: F) -> F::Output
or
tokio::await_async<F: Future>(future: F, termination_pred: Fn()) -> F::Output

as an example...

...
pub async fn fetch_stuff(cli: reqwest::Client, req: reqwest::Request) -> Result<Result, Error>
{
    cli.execute(req).await
}

pub fn fetch_stuff_sync(cli: reqwest::Client, req: reqwest::Request) -> Result<Result, Error>
{
  await_future(async{fetch_stuff_sync(cli, req).await}, 
      Error::Timeout("Exec Failed".to_string())
}
...

Preferably It would be:

pub fn fetch_stuff_sync(cli: reqwest::Client, req: reqwest::Request) -> Result<Result, Error>
{
  tokio::await_future(async{fetch_stuff_sync(cli, req).await}, 
      Error::Timeout("Exec Failed".to_string())
}

If you have the "Correct" solution or references to code that does this...
I would be ecstatic ... and super appreciative.
Any constructive criticism is appreciated
JR

P.S. I'm stuck on tokio:0.2.22 for a while, until some of the libraries I use catch up...

Well since you are running into problems, your fetch_stuff_sync method must necessarily have an async method somewhere above it in its callstack. To fix it, you must do something when transitioning from async to sync code. If you forget to do this transition, you run into trouble when you later try to transition back from sync to async.

Consider this example:

async fn some_async_fn() {
    some_sync_fn_1();
}

fn some_sync_fn_1() {
    some_sync_fn_2();
}
// .. maybe many layers of sync calls
fn some_sync_fn_2() {
    fetch_stuff_sync(...);
}

The issue above is cannot be fixed by changing the code in some_sync_fn_2. The fix must happen in some_async_fn by using spawn_blocking like this:

use tokio::runtime::Handle;

async fn some_async_fn() {
    tokio::task::spawn_blocking(move || {
        let handle = Handle::current();
        some_sync_fn_1(handle);
    }).await;
}

fn some_sync_fn_1(handle: Handle) {
    some_sync_fn_2();
}
// .. maybe many layers of sync calls
fn some_sync_fn_2(handle: Handle) {
    // now block_on is perfectly fine
    handle.block_on(fetch_stuff(...));
}

With the example above, calling block_on is perfectly fine because you are inside spawn_blocking.

The call to Handle::current() can also happen in some_sync_fn_2 if you are sure it is always called from inside a spawn_blocking, but the above is more robust.

@alice,

Thanks for responding...

Given that the sync/async transition may happen at any point in the chain,
If I read your reply correctly, any time you have a mixed sync/async call hierarchy,
you should start it with a "spawn_blocking"...

Unfortunately, a lot of the libraries may make the transition necessary,
but in most cases it goes from sync to async...
Which means there will be more of case 2

pub async fn fetch_stuff(stuff_params, ...)

pub fn fetch_some_stuff(stuff_params, ...) {
    // prep params for stuff
...
    Handle::try_current()?.block_on(fetch_stuff(stuff_params, ...));
...
}

or some variation of it.

Just to clarify...
Avoid: futures::executor::block_on( async { resp.text().await})

When in a context of "tokio::spawn_blocking(...)"
Handle::try_current()?.block_on(fetch_stuff(stuff_params, ...));
Is OK... (Though perhaps not optimal...)

If I remember correctly, Handle is going away in v0.3...
What is the equivalent approach for v0.3 ?

After a long night of trying....And being blocked on all corners.
(With a fresh family of dents in the walls in the office..)

I have to find a way to spawn_blocking on a closure while in the context of
tokio::runtime.block_on()
Not by choice, but based on the constraints of the crates I'm already using...

The Abscissa crate spawns the runner with a fresh Runtime with a
block_on(future)...
There is no way of replacing the runtime with a pre-constructed, without digging deep...
But, I've not found a way to spawn_blocking and pass the runtime, which it needs...
Unless I create a global Temp and pass it that way, not very elegant...

I tried to get the current handle and spawn_blocking from there.
But, since I'm in a &mut self, that does not work... Unless you know a trick around that one.

At this point, it looks like I have to create a whole new runtime and pass it.

Again, I really appreciate the assistance and insight.
JR

What crate are you using that is making it difficult? Can you elaborate on the &mut self issue?

As for Handle, we will probably add it back.

Perhaps you can come with a few examples that are forcing you into this corner, and I can try to help you get around the issues?

@alice,
Thanks for responding...

I'll have to try to recreate it...
I've trashed a lot of approaches...
It will take a little bit...

Most of the walls were related to not being able to spawn_blocking on a future...
I tried a lot of ways to wrap it the ways I know how, but no dice...

in abscissa::tokio

pub fn run<A, F>(app: &'static AppCell<A>, future: F) -> Result<F::Output, FrameworkError>
where
    A: Application,
    F: Future,
{
    take_runtime(app).map(|mut runtime| runtime.block_on(future))
}

I've tried to create variations of this one
(This is just to illustrate, not close to working or compiling)

pub fn spawn_blocking<A, C>(app: &'static AppCell<A>, closure: C) -> ()
where
    A: Application,
    C: FnOnce() +Send+ 'static,
{
    take_runtime(app).map(|mut runtime| async {
        let join_handle = 
        runtime.handle().spawn_blocking(closure);
        let _ = join_handle.await;
    });
}

It failed with various issues around lifetime and send

I tried it from the other end

    // current "operational"
    let _join_handle = 
        std::thread::Builder::new().name("abscissaMain".to_owned()).spawn(move || {
//        abscissa_core::Application::run(&APPLICATION, args);
            //info!("{} service starting", config::SVC_NAME);
            info!("{} service starting alt_boot(&APPLICATION, [{:?}])", config::SVC_NAME, args);
            alt_boot(&APPLICATION, args);
        }
    );

Which creates the RT and starts the the whole show.

I created a new runtime and spawned_blocking()...
But I could not pass the runtime into spawned thread.
I tried to get the handle, set the rt, then spawn...
borrow checked complained.

The transition of control from abscissa to my app happens:
(Which calls the runner up top)

impl Runnable for AgentCmd {
    /// Start the application.
    fn run(&self) {
        let _ = abscissa_tokio::run(&APPLICATION, 
            async{
                let (cmd, mut todo_list, tgt_srvr) = 
                    match self.get_cmd(&mut app_writer()) 
                    {
                        Ok(vals) => vals,
                        Err(err) => {
                            error!("Initialization failed: {:?}", err);
                            return;
                        },
                    };
                if todo_list.len() < 1 {
                    error!("Nothing to do, exiting");
                    return;
                }
                let upload_interval = 
                    Duration::from_secs(cmd.upload_interval.unwrap_or(60));
                info!("[v:{}], Starting agent Loop Verbose={}", env!("CARGO_PKG_VERSION"), self.verbose);
                loop {
                    APPLICATION.read().get_handler_controller().read()
                        .handle_tasks_until(&mut todo_list, upload_interval).await;
                    if !cmd.dump_data(&mut todo_list, &tgt_srvr) {
                        info!("AgentCmd: No user, not updating until new user logged in");
                        let mut interval = tokio::time::interval(Duration::from_secs(60));
                        {   interval.tick().await;}
                    }
                }
            }
        );
    }
}

I changed it to call abscissa_tokio::spawn_blocking instead of abscissa_tokio::run...
Which is where the problem of passing self blocked me...

The best solution, from my standpoint, would be to have:

   tokio::await_future<F:Future>(future: F) -> F::Output

that would handle the transitions...
What do you think the likelihood of that is ?

Again, I really appreciate the insight...
I love rust... but it can be... let's say "challenging" at times.
JR

Instead of "Handle"...
I'd vote for :sunglasses:

tokio::runtime::get_current() ->&Runtime 

Or some variation there of, It seems more versatile, but probably also fraught with dangers...
While on the subject...
Is there a way to get an iterator over the threads and get some run stats ?
That would have been gold when I was trying to debug the runaway thread.
It was a combination of Superluminal(https://superluminal.eu/) and Visual Studio
that ID'd it in the end.
I had to run the app as a windows service, wait for the issue to crop up.
then pull it up in Visual Studio...

I have to admit that it's very unclear to me what's going on here, or what you're trying to do. I try to answer specific comments below, but in general I am very confused about how the many things you mention fit together.

You use spawn_blocking to transition from async to sync, so it doesn't make sense to call it on a future.

It seems like you're building your own spawn_blocking that somehow wraps the async block in a synchronous interface. That defeats the purpose of spawn_blocking which is to use it in async code.

As for passing a Handle to another thread, you will need to clone it first. Then it should work.

Not directly. If you want information about the things happening on Tokio, you should look into the tracing crate.

It is impossible to write such a function.

@alice,

I'll try to make it a bit clearer...

I'm making an application that gathers data from different sources and ships it off for storage.
Each one of the different measurements are on a different system which I have to access using different crates.
Some are sync, some async, some use tokio, some not...

so, I have constraints on both sides..
In order to get around the transitions, I would have to make all my structs and traits async...
But, there are still some issues around async traits... right (dtolny made a macro for it though..)

Which would solve some of the issues, but would still be left with transitions for all the
sync crates. Which would then have to be spawn(...).await...

Which is how we wind up with

futures::executor::block_on(...)

which "fixes" the transition issue.

Without a similar alternative in the tokio crate,
there will be a lot of people reaching for the handy "futures::executor::block_on"...
It works!! sort of... until you have runaway threads, due to missed responses and an endless loop in the poll method..

Just to make sure, I'm not arguing ... just explaining how I got here...
And there will be a lot more people hitting the same trap.

So, philosophy aside, what is the correct way of handling the transition...
assuming "you wake up in a cave.. you're are trapped.. all you know is"

  • you're in a tokio runtime
  • you're not running in a spawn_blocking context
  • you're in a sync function
  • all you have is a future
  • you need to await the result

What is the correct way to "get out of it", except reaching for the "futures::executor::block_on"
And what will be the correct way in v0.3+

I really do appreciate you taking the time to work with me on this...
JR

I did not read all of it closely. Will respond more once I get back from sports practice.

With just that? There is no correct way to do it. For there to be a correct way, you need the following additional assumptions:

  • The runtime is multi-threaded.
  • There is no join!, select!, FuturesUnordered or similar above you in your callstack.

then you can use tokio::task::block_in_place followed by Handle::current and handle.block_on. However be aware that block_in_place is a footgun and bad practice.

:sunglasses:

@alice,

I got a bit further, but hit another wall...
With a really passing the rt via global runtime... (I know, but its a test)

             match tokio::runtime::Builder::new()
                .enable_all()
                .thread_name(name.unwrap_or("Global-RT"))
                .build()
...
    let to_spawn = move || {
                    info!("{} service starting alt_boot(&APPLICATION, [{:?}])", config::SVC_NAME, args);
                    alt_boot(&APPLICATION, args);
                };
    let (_join_handle_tokio, _join_handle_std) =
    if let Some(mut rt) = crate::application::get_global_runtime(Some("abscissaMain"))
    {
        let ret = rt.handle().spawn_blocking(to_spawn);
        crate::application::release_runtime(rt);
        (Some(ret) , None)
    } else {
        match std::thread::Builder::new().name("abscissaMain".to_owned()).spawn(to_spawn) {
            Ok(val) => {
                (None, Some(val))
            },
            Err(err) => {
                error!("Svc.main: Failed to spawn thread: {:?}", err);
                return 666;
            },         
        }
    };

When I spawn_blocking, one of the libraries (trust-dns) panics with

The application panicked (crashed).
Message:  spawning not enabled for runtime
Location: C:\Users\JR\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-0.2.22\src\runtime\spawner.rs:29

So it seems the spawn_blocking thread is limited...

Hmmm... walls...

You need to choose either the basic_scheduler or threaded_scheduler when building the runtime. Otherwise you cannot spawn on it.