On mixing sync threads and async tasks

Recently I have been getting into async and tokio. With great success despite the tribulations I brought up here: A year in Rust. And I still can't read the docs!

My latest creation has tokio-tunstentite serving up web sockets to a test client also using tokio-tungstenite.

Having got that basic ws functionality working I proceeded to introduce NATS messaging into the mix using the nats crate. You see, those web socket clients are expecting to be fed data that derives from subscription to our NATS server (among other things). Each ws client's data requirements may require multiple subscriptions to different 'subjects' from NATS.

So here comes the problem:

When a ws client connects a new tokio task is spawned to handle it. That task makes the necessary subscriptions to NATs and then sits in a loop waiting for responses from NATS and passing them back down the web socket.

Well, the NATS crate does not support async. Waiting on responses is a blocking call.

OK, no problem I thought, just do that call in a task::spawn_blocking and .await the response. So that is what I do and it is working just fine.

But, what did I actually just do?

As far as I understand all the async code could in principle run on a single thread on a single core, no matter how many tasks are involved. Thus saving all that thread overhead. But, as far as I understand, spawn_blocking fires up an actual new thread from which to run the blocking code, thus not blocking the async tasks.

Which is great but imagine I had a million async tasks all making blocking calls using `spawn_blocking'. Now I have degenerated all that async stuff into a million good old fashioned threads. Thus defeating he whole point.

I have framed this question in terms of tokio and nats but it must be a general problem. Is there an efficient way to do this?

Without knowing any of the details and no idea how NATS works, maybe you could put an async wrapper around NATS that works roughly like this:
Have one "regular" thread block on receiving messages from NATS (or maybe one thread per NATS "subject"). Have some function which allows to subscribe to a NATS topic that returns the receiver end of an async channel. Your NATS wrapper keeps the sender end of the channel. Every time your blocking thread receives a new message, it goes through the list of subscribers and forwards the message through the async channel(s).

1 Like

Can I summarize that suggestion as:

Keep all the synchronous waiting for NATS responses to a single real thread, where it can block to it's hearts content. Have that thread match up the responses to the requests from my web socket clients and forward the responses to the appropriate client tasks via channels?

That sounds like something that occurred to me before I started. I did not want to go that way for a couple of reasons:

  1. I have to maintain some record of what clients are connected and what data they have subscribed to and so on.

That is perhaps not so bad, I will likely end up keeping a list of currently connected clients anyway. But it bugs me because it is reproducing the functionality of NATS that I adopted NATS for in the first place. I basically end up implementing the pub/sub in my web socket server when it is already in the NATS server. This could get complicated when authentication and such is added to the mix, which NATS already handles.

  1. It's just more complex with all those channels and stuff to take care of.

I'm not even sure how to do it. What kind of channel can get you from a real thread into and async task and back again? If that real thread is wrapped in an async task that just pushes the problem down a level, how and the sync to async problem is still there.

Unless someone has ideas about this.

  1. It is on the NATS road map to provide a web socket interface for pub/sub to their servers. So hopefully the need for all this goes away.

  2. Or it is on the road map to make the NATS Rust client async. Which would be great.

Anyway, my sync to async musing here is not about NATS specifically. This problem must be common in other situations. How do people deal with it in a way that does not degenerate all the async tasks into thousands of sync threads?

What I was imagining is some way to have the sync thread inject events into the async run time so that it responds to them in the same way it responds to data arriving on sockets etc, and then schedules the async task to handle it.

To go from async to sync, you can use an unbounded crossbeam channel since send never blocks so is safe to use in async. To send the other way a tokio unbounded mpsc channel works because only the receive end is async. At least that’s what I’ve used in the past.

2 Likes

And in general if you need to send a message and get a response back, you can send a response channel in the message. So for async to sync I would include a tokio oneshot sender for the return message. And for sync to async, I would include a bounded crossbeam channel.

1 Like

Regarding spawn_blocking, it uses a threadpool with a large upper limit of tasks, and will as such exert backpressure on async code if too many of them are spawned.

That said, if all you async code does is spawn some blocking tasks, then yes, you lose the advantage of async. If only part of the code is blocking, then it is probably fine, although the blocking code can become a bottleneck.

1 Like

Thanks all, some great suggestions and pointers there.

In this case, if I dream of reaching the scale of thousands/millions of web socket connections, then every one of those connections will be waiting on publications from NATS at the rate of ten per second. Each one of which would have spawned a real thread to wait on response the response it wants. Thread pool or not that sounds like a lot of real threads and defeats the idea of using async altogether.

Meanwhile the poor old NATS server is having to maintain millions of subscriptions and route responses back to the right places.

Looks like a better idea would be:

  1. Have a single real sync thread that makes subscription requests to the NATS server when asked to do so.

  2. Have another single real thread that listens on all the publications from the NATS server.

  3. Run all the async web socket stuff as I do now with tokio.

  4. Those asyn web socket client handlers will ask that synchronous subsystem above to make the actual NATS subscriptions. Then .await the responses from the sync sub system,

This plan requires suitable channels between all the async tasks and the sync threads, which is the missing link for me. Sounds like the suggestions made here will solve that problem.

It will also require some bookkeeping to keep track of subscription subjects and which ws client asked for them. So that the sync side can route responses back to the async side.

The end result would consolidate subscription subjects that are common to many ws clients. Much more efficient all around.

This will never reach such giddy heights but what I have now bugs me. I'll start on plan B tomorrow.

Thanks again.

I started to scribble some notes about what drewkett said above, so as to remind me where I'm heading tomorrow. Looking around the docs for the various items I found exactly the answer to this threads question written plain as day:

Communicating between sync and async code

When you want to communicate between synchronous and asynchronous code, there are two situations to consider:

Bounded channel : If you need a bounded channel, you should use a bounded Tokio mpsc channel for both directions of communication. To call the async send or recv methods in sync code, you will need to use Handle::block_on , which allow you to execute an async method in synchronous code. This is necessary because a bounded channel may need to wait for additional capacity to become available.

Unbounded channel : You should use the kind of channel that matches where the receiver is. So for sending a message from async to sync , you should use the standard library unbounded channel or crossbeam. Similarly, for sending a message from sync to async , you should use an unbounded Tokio mpsc channel.

From the Tokio mpsc page here: https://docs.rs/tokio/0.2.22/tokio/sync/mpsc/index.html

Marvelous. The job is as good as done :slight_smile:

Thanks all.

2 Likes

I'm glad you like it. The next version of Tokio should include a blocking_send and blocking_recv to better handle the bounded case.

1 Like

I love it.

I was expecting to be in for another 2 days struggle with the compiler and that "I can't read the docs." problem I have.

It was up and running bright and early this morning. All the NATS interfacing (pub/sub) separated out into it's own subsystem of real threads. Hooked together with channels as suggested here.

I do like the separation of concerns that brings. Designing with tasks/threads and channels has always appealed to me. Takes me back to the days of the Transputer and it's Occam language. Or the way we designed multi-threaded/multi-core radar processing systems before that.

Still work to be done to deal with clients disconnecting, handling all the possible errors and such but the basic sync to async mechanics is in place.

Thanks for the great advice all.

2 Likes

Aside:

Reminiscing :grin:

Dang! This whole thread was based on my using the nats crate version 0.7.2. Which was the version in crates.io at the time and did not have an async API.

Having gone to all the trouble of getting my async web socket world to talk to the sync nats world today I find out that there is nats 0.7.4 which has an async API!

This is great news but grrr.... back to the drawing board...

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.