Horrible Experience With Async Today

I have a Rust-based app that, as in my previous Rust work, avoids async. (I don't want to start anything "political"; I've just preferred not to use it in the past, and have achieved very high performance using threads and channels.)

I said it "avoids async" but it could not completely, because it is a gRPC server and Tonic uses async.

That was fine, because I was able to easily confine async to the top-level of the app: The gRPC interface impl methods are async, but call no async code.

This all changed radically when an underlying service, deep in the implementation, was changed from HTTP to gRPC - my app is a client to that new, unrelated (my my server) gRPC interface.

This is what happened:

  • I tried to encapsulate async deep in that service layer by constructing my own Tokio runtime there, and wrapping the gRPC calls in block_on().

  • I thought this was a great idea, as everything compiled and looked neat.

  • WRONG - runtime error - "cannot start a runtime within a runtime..."

  • I tried using tokio::task::spawn_blocking as an alternative but that requires calling await - of course not possible in non-async functions - (or propagating Futures)

  • I bit the bullet - as an experiment - and changed the three layers of the application (the gRPC server entrypoints, the services layer, and dependent services) to use async fn instead of fn

  • I then ran into errors I did not expect, of this sort:

**error****: future cannot be sent between threads safely**
**-->** server/src/main.rs:72:57
**|**
**72** **|** ) -> Result<Response<CreateOrdersResponse>, Status> {
**|** **_________________________________________________________^**
**73** **|** **|** let span = span!(Level::INFO, "OrderService.create");
**74** **|** **|** let _guard = span.enter();
**75** **|** **|** info!("Received request: {:?}", request);
**...** **|**
**89** **|** **|** // }
**90** **|** **|** }
**|** **|_____^** **future created by async block is not `Send`**
**|**
**=** **help**: within `{async block@server/src/main.rs:72:57: 90:6}`, the trait `Send` is not implemented for `impl Future<Output = ()>`
**note**: `<O as OrderService>::create_order` is an `async fn` in trait, which does not automatically imply that its future is `Send`
**|**
**9** **|** async fn create_order(&self, order: Order) -> ();
**|** **^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^**
**note**: future is not `Send` as it awaits another future which is not `Send`
**-->** server/src/main.rs:78:9
**|**
**78** **|** self.orders.create_order(order).await;
**|** **^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^** **await occurs here on type `impl Future<Output = ()>`, which is not `Send`**
**=** **note**: required for the cast from `Pin<Box<{async block@server/src/main.rs:72:57: 90:6}>>` to `Pin<Box<dyn Future<Output = Result<tonic::Response<CreateOrdersResponse>, Status>> + Send>>`

Yikes! WTF? Why are these Futures not Send - when the Content are? (Order is a simple struct built from primitives.)

I then discovered that using async functions in traits is "not recommended" and is likely the cause of this problem. Are you possibly kidding me? Traits are Rust's main mechanism for implementation abstraction, and I use them throughout to put services behind an interface I can mock for unit testing. I can't stop using traits!

I then took the compiler's advice and started redoing my trait function signatures:

37 - async fn create_order(&self, order: Order) -> Result<Order, String>;
37 + fn create_order(&self, order: Order) -> impl std::future::Future<Output = Result<Order, String>> + Send;

At this point I was rather alarmed, as now I am realizing that swapping out an HTTP service for a gRPC one at the bottom of a dependency graph has indeed "gone viral," as they said, mutating the fn signatures across the board.

That still wasn't it, though. Because there seems to be no way to map the result or the error "channels" of a future in Rust, none of that code worked anymore. My code was translating from gRPC artifacts to domain objects and from proprietary errors to uniform ones at the various levels, etc.

This community has been great to me since I started Rust programming a year or more ago. I hope you can help now, as I am literally sick to my stomach. I am not going to make dev deadline, and I am really at loss in how to proceed here.

Specific questions:

  • Is there ANY way with Tokio, that I haven't thought of, to encapsulate async at a "bottom" level - even if I'm in the context of a Runtime? (If so this solves all the problem.)

  • Is there any way to make async fns in traits that will actually work - that won't result in the "future created by async block is not Send" error?

I am not blaming async per se for this - mess. Async mechanisms in any language are usually viral - it's just the way it goes. Rust's low-level nature - this strange matter with traits - exacerbates the problem.

I am hoping very much I can actually encapsulate async at this low level - exchange an HTTP service for an async (gRPC) one - even in the context of an async runtime (which I'm in, again, because the top level is async, because that's Tonic does it.

3 Likes

I don't use async so I'm not sure this is the best solution or whether it would work for you (a separate thread is needed), but I wonder if you've seen this SO answer to that problem.

One approach that's fairly straightforward is to wrap that upstream async client in a sync channel interface.

The basic idea is you have a request channel that you use something like blocking_send() to send a pair of the request body and a one-shot response channel. You then block on the response channel for the reply.

The wrapped API then sits in a loop spawned from main that pulls requests off the other end, spawns the task to invoke them and send the response back on the one-shot.

Most of the thinking is about how much buffering you need and error handling, it's otherwise fairly simple to pull off.

Less efficient of course, but you're already making network calls here, it shouldn't tip the needle. And you can put anything you want in the request, of course, so you can batch up work or whatever.

13 Likes

This could work - thank you.

I'm still very interesting in hearing if there's a way to use async fn in traits without these issues, and also some explanation as to why you apparently can't transform (map/map_err) futures - that is very inconvenient indeed, and seems to run counter to Rust's general intelligence about lazy evaluation, etc.

It did work. Code:

        info!("Received request: {:?}", request);
        let order = from_request(request.into_inner().orders.into_iter().next().unwrap());
        let service = Arc::clone(&self.orders);
        let result = thread::spawn(move || service.create_order(order)).join();

        match result {
            Ok(_) => Ok(Response::new(CreateOrdersResponse {
                status: { format!("Success") },
                results: vec![],
            })),
            Err(e) => Err(Status::internal("Order creation failed")),
        }

This allowed me to keep the rest of the application as-is. No interface changes.

Mission accomplished.

3 Likes

Being able to mark the returned futures as Send is actually a big missing feature with async fn in traits, that was known when RPITIT[1] was stabilized in 1.75.

The blog post announcing async fn in traits recommends using the trait-variant crate, which can be used like this:

#[trait_variant::make(Send)]
trait Trait {
    async fn foo(&self) -> Vec<u8>;
}

The #[trait_variant::make] macro automatically rewrites that to the following:

trait Trait: Send {
    fn foo(&self) -> impl Future<Output = Vec<u8>> + Send;
}

This trait can even be implemented with async fn:

impl Trait for SomeType {
    async fn foo(&self) -> Vec<u8> {
        self.something().await
    }
}

  1. return position impl Trait in traits ↩︎

4 Likes

I see from that that you just need to manually include Send in the type signature. I should've realized that. And that's trivial - so this is not a big problem.

The fact that Futures aren't transformable (mapping of value & error) IS a big problem, though. I have to think I'm missing something there but it's obviously not in the API.

There are two ways to transform the output of a future:

  • The async block way:

    async move { other_future.await + 1 }
    
  • The futures way, FutureExt::map() (this existed before async, and complex futures were always built with combinators like this):

    use futures::future::FutureExt;
    other_future.map(|x| x + 1)
    

    Or if you specifically want to work with Future<Output = Result<...>>s, there is also TryFutureExt::map_ok() and map_err().

10 Likes

This is great - I thought it had to exist - but I find the docs confusing. I understand the idea of separating interfaces, but the fact that you look at the docs for Future and don't see this is - not ideal. How is someone unfamiliar supposed to know to look for FutureExt? There's not a single reference to it in the docs.

You kind of have to know that the futures crate exists, but from there you can find map by searching in the API docs (search results for “map”). It’s pretty commonly used in async code, so a tutorial will show futures sooner or later (the official Rust async book seems to be very much under construction right now, so I can’t comment on it).

The futures crate isn’t directly advertised from std docs. The general concept of having std link to a different crate is generally turned down as “too opinionated”[1], even though futures specifically is in fact in the rust-lang GitHub org and is a standard system for Rust async APIs.
(Then again, if you google “rust futures” the crate actually shows up before std::future::Future)

Nit: Please don’t depend on the entire futures crate. This is my big pet peeve with Rust libraries, having facade crates that export more functionality than you really need[2] (and full features that enable too much[3]) that leads to dependency bloat.
[Try]FutureExt is defined in futures-util, so use that.[4]


  1. if I remember some previous discussions correctly ↩︎

  2. futures, num ↩︎

  3. tokio, syn ↩︎

  4. Which still (on default features) uses all of syn. *sigh* ↩︎

1 Like

This is a feature for quick applications. I do find it odd that so many people seem to forget that all these library crates are eventually getting used somewhere :yum:

Thanks for the helpful responses - and what you described here is a really bad situaiton.

(I could be wrong, but it seems to me that almost the only sort of real issue/large disagreement in the Rust community/ecosystem is over async.)

futures is, really, just another library.[1] std documentation doesn't link to futures in the same way that it doesn't link to async-trait, itertools, tempfile, thiserror or any other popular utility library that builds on std types and traits. The ways people discover such libraries are by reading existing code, tutorials, and recommendations from other people or sites like Crate List - Blessed.rs.

async does have a unique “ecosystem split” problem, which is that IO futures tend to be specific to an executor. But, in my opinion, this should be addressed by encouraging writing library code that is generic, or “sans-IO” — not by the Rust documentation picking a winner. And futures is not in this fight, anyway. (While futures has an executor, that executor notably doesn’t offer IO functionality, or any ambient “this function panics if not called from a futures::executor context” like Tokio does, so it would be difficult to implicitly depend on it being the executor in use.)


  1. At least, it is today, whereas historically it was also the library that defined the Future trait. ↩︎

3 Likes

I had the same problem writing a crate- it seems like someone solved your problem pretty well, but on reading this it sounds you might be trying to await a future of something created in one thread in another- I think all parameters/data of that function/struct/method must also be send +'static (since the compiler needs to be certain that references to any data from one thread to another area valid)

Just fyi with tokio you can use

Handle::current().block_on( async move {...});

to run something in the current executor. Be aware that it will block the thread tho, so you might want to spawn a new thread and use handle.block_on() in there, if that matters to you. It's not advisable to block a thread from within the Runtime tho (since it can lead to deadlocks), so it might be a bit tricky to use in practice.

That is NOT true. It results in

Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.

That's part of what started the journey above.

I just tried it again to make sure.

That's what I meant with it being tricky to use, let me give you an example:

fn main() {
    let mut join_handle = None;

    let rt = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap();

    rt.block_on(async {
        let handle = tokio::runtime::Handle::current();
        join_handle = Some(std::thread::spawn(move || {
            // Do whatever
            println!("abc");

            let _foo = handle.block_on(async move {
                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
            });

            // Do whatever with the result

            println!("def");
        }));
    });

    if let Some(join_handle) = join_handle {
        let _ = join_handle.join();
    }

    drop(rt);
}

This works, since you don't run the other .block_on() inside the existing runtime, but in another thread. We are still re-using the one runtime to run the task, we just can't use block_on inside the runtime since that would lead to no other tasks being able to be executed concurrently anymore, since we are blocking the runtime thread.

As I said, it's a bit tricky to use and I wasn't suggesting that it is a great solution for your usecase, just that it IS in theory possible.

Also note that we are waiting for the join handle before dropping the runtime, since if we don't and our thread becomes an "orphan" (the main function exits before it and the runtime gets dropped) the block_on will panic since it never received results from it's task.

Although I didn't know that tokio will actually catch you blocking the runtime with block_on and panic, that's interesting to know :slight_smile:

1 Like

That's essentially the same solution that was already given.

That's fair, and if the existing solutions work for you that's great, just wanted to note that it is in theory possible although pretty impracticable to actually use.

1 Like

Tokio has an escape hatch that lets you call block_on from within async code: block_in_place. Using it isn't great, but it's a lot better than spawning a thread and joining it.

5 Likes