Run a tokio task on each worker thread

I have some global state, and in order to make it fast, I want to use thread local instead of a static mutex. The problem is that eventually (for example every five seconds) I want to do some work on all of these thread locals, for example updating them or aggregating them. My idea was to use a function similar to tokio::spawn that spawns the task on each worker thread instead of a single one. Is it possible to do that? Or is it another way to manage thread locals in tokio?

Have you considered using message passing instead of shared memory? Maybe you can implement your application as actors, where you pass your global state (or updates to it) around as messages, rather than as a shared memory region.

The work includes two parts:

  1. The very frequent and performance sensitive part, which can't afford atomic operations
  2. The infrequent synchronization part

Is it possible to do it with actors in a way that the part 1 doesn't need any atomic operation?

Assuming your "frequent part" is read only, this made me think of arc-swap because it is a good solution for "read-mostly write-seldom scenarios" as the doc mentions. The idea is that:

  • When (frequently) reading the state in each task you load it (which is an atomic op) and then operate on it as a shared ref (&T) for some unit of work. There is no locking. You can load the state as frequently or infrequently as you'd like, to trade off the cost of the atomic op with the staleness of the state.

  • When (infrequently) updating the shared state, you create the new state from the old state (copying it entirely, or at least copying the mutable parts) and swap in the new state (which is an atomic op). From that point onward, a reader loading the state will get the updated state. Note that because there is no locking, readers are not blocked while the state is being updated.

Because of the way this works, you shouldn't need multiple copies of the state or thread locals to simply read the state efficiently in multiple tasks.

This is not integrated with async in any way, but it seems it shouldn't matter since there is no locking or blocking. If you're interested, take a look at the doc about its limitations, performance, patterns, etc. It is a very popular crate based on downloads, so not a risky dependency.

2 Likes

Thanks! It is indeed an interesting crate, but in my case, the frequent part writes in the global state. The frequent part is actually calculating something, and the infrequent part is aggregating those and reporting the result to the user. Is there any crate for this use case?

CPU-bound tasks don't go well with asynchronous runtimes:

Async code should never spend a long time without reaching an .await.

You should think about splitting your tasks that perform the computation off of tokio (maybe using something like rayon instead) and only do the aggregation/reporting to the user from within the runtime. Again, you could use message passing to pass the data from the tasks to the aggregator. tokio::sync::mpsc::Sender has a blocking_send method.

3 Likes

The task is not pure computation. It is something like counting the number of bytes transmitted over each connection, and there are millions of connections, each connection's task is io bound and yield the control to the runtime soon, but overall the calculation takes a considerable amount of cpu time. What is the best thing to do in this situation?

What about the infrequent aggregation task, is it purely CPU or does it also interleave IO and CPU?

Just trying to get a complete picture.

Tokio does not have a feature to run a particular piece of code on every worker thread.

Would something like on_thread_park work for you? It will get called periodically on each worker thread whenever the worker thread is doing work. That said, if the worker thread goes to sleep for an extended period of time, then it won't get called.

1 Like

Could the tasks updating the thread locals periodically send the collected data to an aggregation task via an mpsc queue? This could be time based, or perhaps just based on a counter in the thread local struct that you increment each time you update the data.

When sending the data, I assume you may need to copy it and reset/clear it in the thread local. Fortunately this doesn't require locking or coordination because it is a thread local.

The drawback is that if there is no activity on a thread, collected data won't be aggregated.

This is really just the inverse of what @jofas suggested above.