Sync function invoking async

I have a scenario where I have a standard (synchronous) function that needs to invoke an async function and return a value. I know that it is best practices to use async up/down the call stack where needed but due to other API constraints that is not an option in my case. I am using the latest version of tokio and my code looks similar to the code below. It looks like the async block in handle.spawn is never executed which is why I never am able to read a result from my channel - the code always panics with no value received. I know there has to be a way around this but I am stumped.

pub fn foo_sync(key:& 'static str) -> Result<String, Box<dyn std::error::Error + Send + Sync + 'static>> {
    if let Ok(handle) = tokio::runtime::Handle::try_current() {
        //This sync function is being invoked from an async context so use the handle to spawn a task
        // Signal main thread when task has finished.
        let (send, mut recv) = tokio::sync::oneshot::channel();
        handle.spawn(async move {
            //The line below never runs
            println!("*** future starting!!");
            let res = foo_async(key).await;
            send.send(res).unwrap();
        });
        for _ in 0..10 {
            match recv.try_recv() {
                Ok(val) => return val,
                // The channel will never receive a value.
                Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
                    panic!("channel unexpected closed");
                }
                _ => {
                    //wait
                    std::thread::sleep(std::time::Duration::from_millis(30));
                }
            }
        }
        //Return an error if no result received
        panic!("no value received");
    } else {
        //Create a new runtime and run the function
        let res = tokio::runtime::Builder::new()
                .basic_scheduler()
                .enable_all()
                .build()
                .unwrap()
                .block_on(foo_async(key));
        return res;
    }
}

pub async fn foo_async(key: &'static str) -> Result<String, Box<dyn std::error::Error + Send + Sync + 'static>> {
    Ok("a new string".to_string())
}
3 Likes

handle.spawn returns a JoinHandle which is a future you need to await, but unless I misunderstand something important you can solve this in a much easier way by simply doing something like this instead:

pub fn foo_sync(key:& 'static str) -> Result<String, Box<dyn std::error::Error + Send + Sync + 'static>> {
        let future = async move {
            println!("*** future starting!!");
            let res = foo_async(key).await;
            res
        };
        
        let res = tokio::runtime::Builder::new()
                .basic_scheduler()
                .enable_all()
                .build()
                .unwrap()
                .block_on(future);
        res
}

pub async fn foo_async(key: &'static str) -> Result<String, Box<dyn std::error::Error + Send + Sync + 'static>> {
    Ok("a new string".to_string())
}

Playground link

1 Like

Have you tried just calling handle.block_on(foo_async(key)). That seems more straightforward.

2 Likes

When I change the code to use handle.block_on(foo_async_key)) it compiles but when it runs I get this panic: Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.

1 Like

When I modified the code to use your example I also get:

Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.

I am testing these function with two different unit test methods, one with the attribute #[test] and one with the attribute #[tokio::test] This is to ensure that foo_sync will operate correctly in with an existing tokio runtime or without a tokio runtime being started. In my case, foo_sync is a low-level library function and it will be used in higher level applications, where some are async applications and some are traditional synchronous applications.

What you're doing is turning the async operation into a synchronous (blocking) one, and so doing this within an async framework is at least as bad as doing synchronous IO. At best, it'll use up one of the async framework's threads, and at worst, using up too many threads could cause the framework to cease to make progress, leading to a complete deadlock. This is why tokio errors here, to prevent this situation.

To fix this, I'd recommend either not calling the sync method from within an async framework, or if you do, using tokio::task::spawn_blocking() to call it. Then it'll be treated the same as regular, blocking IO and you should be all good.

If your high level application uses the sync method as a primitive, then I think by definition it would also then be a blocking / synchronous IO call, and should thus always be called via spawn_blocking anyways.

You simply can't turn a sync function into an async function. There's no way to do it. Tokio will panic in block_on if you try, and if you cheat it with thread::sleep (or any other method of blocking), you risk having everything else running on the runtime at the same time pause until foo_sync returns, and this includes your spawned task by the way, which would cause your entire application to deadlock.

@daboross' recommendation about spawn_blocking is correct. If your async function puts the call to foo_sync inside spawn_blocking, then there's no issue and block_on should just work.

2 Likes

OK, so I didn't get that you might have a runtime running already. If you for some reason want to just rework your exact example to run both if it's called within an existing tokio context.

Now you can get this to work if you rely on Tokio's threaded scheduler since spawn will start running the future on a different thread. But it's a brittle work around since you'll have to rely on the "exisiting" runtime to be threaded for this to work. This could probably even break in future releases of Tokio since AFAIK there is no guarantee that it will always work this way.

I don't know the details of your problem, so I just post the solution here even though I would try hard to rewrite this some other way.

Rewritten original example
use tokio; // 0.2.21
use std::thread;
pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync + 'static>>;

pub fn foo_sync(key: &'static str) -> Result<String> {
    let (send, mut recv) = tokio::sync::oneshot::channel();
   
    
    let future = async move {
        println!("*** future starting on thread {}!!", thread::current().name().unwrap_or("N/A"));
        let res = foo_async(key).await;
        send.send(res).unwrap();
     
    };
    if let Ok(handle) = tokio::runtime::Handle::try_current() {
        handle.spawn(future);
    } else {
        //Create a new runtime and run the function
        let _res = tokio::runtime::Builder::new()
            .basic_scheduler()
            .enable_all()
            .build()
            .unwrap()
            .block_on(future);
    }

    for _ in 0..10 {
        match recv.try_recv() {
            Ok(val) => return val,
            // The channel will never receive a value.
            Err(tokio::sync::oneshot::error::TryRecvError::Closed) => {
                panic!("channel unexpected closed");
            }
            _ => {
                //wait
                std::thread::sleep(std::time::Duration::from_millis(30));
            }
        }
    }
    //Return an error if no result received
    panic!("no value received");
}

pub async fn foo_async(key: &'static str) -> Result<String> {
    Ok("a new string".to_string())
}

To test this you can run this in a main function that starts a threaded runtime:

fn main() {
    let mut rt = tokio::runtime::Builder::new()
            .threaded_scheduler()  // <-- NB!!
            .enable_all()
            .build()
            .unwrap();
    let res = rt.block_on(async {foo_sync("Hello")}).unwrap();
    println!("{}", res);
}

Here is a link to the Playground. As you'll see I added a printout of what thread the future runs on so you'll see the difference. In the playground link there is both main_old where it calls foo_sync outside a tokio context, and the main I added wich calls it within an exisisting tokio context. I'm not sure what kind of runtime it is called in when used with #[tokio::test] but I guess that could be configured somehow.

@LukeMauldin I somehow replied to my own comment, but the intention was to reply to Sync function invoking async - #5 by LukeMauldin.

Edit

My first example didn't work. I didn't realize that basic_scheduler wasn't threaded, but I added explanation for it now and an example to show how it works.

There is no guarantee that this will work on the threaded runtime either, even with the current implementation. For example, if you call foo_sync several times at the same time, each call will use up one of the threads, and Tokio only spawns one thread per cpu on your machine. This means that after 4-8 or so calls at the same time, you are out of threads, causing you to deadlock just like on the basic scheduler.

And even if you don't run out of threads, the performance of the rest of your application will be incredibly poorly affected by doing this, because everything that was previously running on the same thread will have to be moved to some other thread. This doesn't happen immediately, and will cause those tasks to pause for a considerable amount of time.

1 Like

Maybe I was a bit too cautious in my warnings, but just to be clear, I don't recommend this solution. Even though the number of threads can be configured it's not a good way of dealing with this since there is no guarantee that the spawned tasks are run to completion and are dropped if the runtime shuts down (I didn't consider the deadlock when running out of threads though).

AFAIK there is no good way to achieve what OP is asking. If for some reason I absolutely had to solve it exactly the way it was described (which I would try to avoid) I would probably do something like this using rayon::ThreadPool + a channel instead of spawning new threads and just accept that it's not pretty or optimal.

@alice There is no inherent problem in instantiating several runtimes on seperate threads as long as they don't "overlap" is it? Apart from it being a sub-optimal design.

1 Like

Starting an extra runtime wont deadlock, but you will still stop the world on the runtime calling foo_sync.

2 Likes

I would like to thank everyone for their response and for helping me understand what is going on underneath the hood with the tokio runtime. I have made the decision to just remove the foo_sync and force the caller to make the decision to invoke foo_async either as a true async function or by using spawn_blocking.

2 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.