(sync|async) -> sync [...] -> (async) # this old chestnut again

I'm experimentally porting parts of an app to Rust. There is no global async runtime at the moment, and won't be for some time.

Some of the new Rust function implementations are to use other frameworks (e.g. AWS Rust SDK) which uses Tokio.

Some old code that invoked a C based S3 upload, now calls a Rust sync function which ultimately uses AWS S3 tokio based async functions.

This means that for now I will need to instantiate a short-lived tokio async runtime on demand to run the AWS stuff. But I also should detect if this sync function is already running in (and blocking) a runtime, in order to cope with nested runtimes and the natural inability to re-run block_on.

So this is my attempt:

/* Tips from: https://users.rust-lang.org/t/function-currying-that-ultimately-returns-a-future/62847/8
              https://play.rust-lang.org/? version=stable&mode=debug&edition=2018&gist=1a4302fe790bb8f1f8a95c7aa37c67a1
   But for possible improvement, see: https://users.rust-lang.org/t/simplest-possible-block-on/48364/9
*/
pub fn run_on_tokio<FA, R>(func: impl FnOnce() -> FA + std::marker::Send + 'static) -> R
where
    FA: std::future::Future<Output = R>,
    R: std::marker::Send + 'static,
{
    // get or make a runtime
    match tokio::runtime::Handle::try_current() {
        Ok(_) => {
            /* A runtime exists, so start a new thread for a new runtime */
            thread::spawn(|| {
                tokio::runtime::Builder::new_current_thread()
                    .enable_all()
                    .build()
                    .unwrap().block_on(func())
            })
            .join()
            .unwrap()
        }
        Err(_) => {
            /* No runtime exists so we can run one in this thread */
            tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap().block_on(func())
        }
    }
}

It is invoked from a sync function with something like this ghastly invocation:

fn do_thing(a: &str, b: &str) -> i32 {
    run_on_tokio(move || async move { call_async_do_thing(a, b).await })
}

I know that it is clearly lamentable to start a new thread when the sync function was indirectly blocking (via uncountable opaque library functions) an async runtime, but it is no worse that what was happening anyway, and will be solved when all the old code is finally ported.

Certainly if my sync function was called from an async function (indirectly) we hope that spawn_blocking was used somewhere at the beginning of the chain.

Setting that aside, is this a reasonable approach? have I lied fatally about lifetimes? Is the move stuff fine? Are there likely to be parameters for which this fails?

Closures are passed for 2 reasons, to cope with any number and type of arguments (my generics foo is weak) and also to defer the initial invocation of the wrapped function until the new runtime context.

Perhaps it should be based on some kind of async trait marker, so when invoked from an async functions it uses spawn_blocking instead, but that is just polish, and I'm not sure how to start that yet. Probably better to just avoid this like the plague in async functions.

Ideally you shouldn't try to nest runtimes in the first place.

1 Like

True.

But ideally I won't port the entire app all in one go either, so my attempt to deal with the non-ideal situation is necessarily non-ideal.

The nested runtimes may arise through unanticipated code paths, for instance if the new Rust code has to call back into C to fetch something which may lead to another call into Rust code to do something else implemented async.

The old blocking implementation blocks the calling thread anyway, so to maintain that behaviour is better than panicking or introducing new runtime errors.

The extern "C" wrapper of the new implementation must provide the adaptor between blocking and async using run_on_tokio without introducing new failure modes.

New Rust code will just call the new async implementation directly, but that still may involve a call back via C back to Rust, depending on how other developers choose to manage their porting activity.

Unfortunately, when using the current thread runtime there is no way to nest runtimes. You might attempt something like unconstrained plus the block_on from the futures crate, but since you can't yield back to the IO driver, IO won't work in the nested call.

Another option you might try is to spawn the current thread runtime on a background thread, then have each async call just do block_in_place + Handle::block_on. This might work OK for you since it puts the IO driver on the background task. However, and time you use tokio::spawn, the spawned task will go on the background thread, and there you can't do any nesting, so this assumes you don't need to nest in spawned tasks.

I appreciate your responses. I think there is some confusion.

The solution outlined in the post has working IO by using a new tokio runtime inside a thread::spawn with a new tokio runtime; so runtimes aren't actually nested.

What I'm doing might be what you meant when you said: "Another option you might try is to spawn the current thread runtime on a background thread" but it wasn't spawned using tokio, but if tokio::spawn is later used onto the new thread, will my join() wait until those tasks are complete? I suspect not given that block_on was only for the closure future, so how would I fix that? (The thread is private to the outer sync function).

But my main questions, as it appears to work, were: Is this a reasonable approach? have I lied fatally about lifetimes? Is the move stuff fine? Are there likely to be parameters for which this fails?

Blocking the thread of your runtime with thread.join() like that is usually a terrible idea since it blocks everything on that runtime from running. As for calls to tokio::spawn, they are killed when the runtime is destroyed, i.e. right after you return from block_on. You can't really fix this other than explicitly waiting for those spawned tasks.

Blocking the thread of your runtime with thread.join() like that is usually a terrible idea

It is terrible, but the old non-rust S3 API also did that. The fix would be to keep porting the calling code to Rust so that this sync wrapper becomes redundant.

As for calls to tokio::spawn , they are killed when the runtime is destroyed, i.e. right after you return from block_on . You can't really fix this other than explicitly waiting for those spawned tasks.

It could be hard to know if there are any. After blocking on the future is there no option to block until there is no work?

Perhaps not, as it appears from the equivalent code in tokio::main - Rust, even the main tokio behaviour is to not wait for anything except the first future.

My main worry is that I have I lied fatally about lifetimes in R: std::marker::Send + 'static as I don't see anything particularly 'static about it, but it seemed necessary (and common in such solutions that I have found).

Is the move stuff fine? Is it likely to bring problems that only box and pin can solve? Are there likely to be parameters for which this fails?

thanks

The Send and 'static stuff is fine. The 'static bound means that R cannot be a type that has lifetime annotations, e.g. you can't have R = &'a str.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.