Strategy for converting sync project to async?

Obtain the Handle from the Runtime by either cloning Runtime::handle or with Handle::current while inside an async fn. When using Handle::current, you need to do this before leaving the runtime context and entering the rayon thread pool.

Once you have a handle, you can do this:

handle.enter(|| futures::executor::block_on(my_future))

This will use Handle::enter to enter the Tokio runtime context, which makes shared resources such as the IO reactor and timing utilities available to the future you're running. Note that the use of block_on will mean that the future itself is polled on the rayon thread instead of somewhere in the Tokio threadpool. Since the futures executor is very bare-bones, it doesn't really have any setup-cost, making this is the current best way of blocking on a future without mutable access to the Runtime object itself.

Every call to Runtime::new will create a new separate Tokio threadpool, with its own set of shared resources (e.g. IO and timers). This is not recommended. Unfortunately Handle::current only works inside the Tokio context, so you will need to find a way to pass handles around when deep inside the rayon threadpool.

It shouldn't be a problem to have both a rayon and Tokio threadpool at the same time.

Note that if you call rayon from spawn_blocking, then rayon afaik will be using the thread you called rayon on to do some of the work, and delegate other parts to the rayon threadpool. Since spawn_blocking is still inside the Tokio context, this means you will have code that sometimes is inside the context and sometimes isn't.

When I worked at Omnio we converted the code top-down. You can use spawn_blocking to perform blocking operations from the asynchronous code, including waiting for rayon tasks. An alternative would be to use message passing to send back the result when rayon finishes.

In some cases a semaphore will make sense. Other times spawning 4-8 workers and having them fetch jobs from a shared job queue makes sense. Creating a stream of futures (e.g. with iter) and using StreamExt::for_each_concurrent can also work quite nicely.

Regarding join_all: you should watch out for the fact that when joining futures, a wakeup of any future you're joining over wakes up all the futures. To combat this, use spawning or FuturesUnordered.

6 Likes