Strategy for converting sync project to async?

I'm planning to migrate server to std futures. The problem is, it currently mixes network requests with all kinds of CPU-heavy and blocking operations willy-nilly, with a heavy dose of rayon on top. I'm worried it's going to be messy.

I don't think I can convert everything in one go, so I expect I'll need to have to use block_on or such to adapt between sync and async parts of the project.

I'm also not sure how rayon is going to interact with async runtimes. I've converted a smaller project previously and I've been fighting "reactor is not running" errors there. It's not clear to me what are the implications of running tokio::Runtime::new() 60000 times, or whether Handle::current() can be expected to work everywhere.

  • How do you properly wait on a future from a deeply nested callback of a rayon task?

  • If you've converted some project to async already, what's your experience: is it better to first convert leaf functions (that use HTTP requests), or start from top-level functions (that are server route handlers already running in async env, but may end up still using blocking I/O)

  • What do you recommend to control concurrency of async work? futures::join_all literally runs everything at the same time, so futures::join_all( may launch 35000 requests at once, killing every server and every database it may talk to. I'd rather limit it to 4-8 requests at a time.


Obtain the Handle from the Runtime by either cloning Runtime::handle or with Handle::current while inside an async fn. When using Handle::current, you need to do this before leaving the runtime context and entering the rayon thread pool.

Once you have a handle, you can do this:

handle.enter(|| futures::executor::block_on(my_future))

This will use Handle::enter to enter the Tokio runtime context, which makes shared resources such as the IO reactor and timing utilities available to the future you're running. Note that the use of block_on will mean that the future itself is polled on the rayon thread instead of somewhere in the Tokio threadpool. Since the futures executor is very bare-bones, it doesn't really have any setup-cost, making this is the current best way of blocking on a future without mutable access to the Runtime object itself.

Every call to Runtime::new will create a new separate Tokio threadpool, with its own set of shared resources (e.g. IO and timers). This is not recommended. Unfortunately Handle::current only works inside the Tokio context, so you will need to find a way to pass handles around when deep inside the rayon threadpool.

It shouldn't be a problem to have both a rayon and Tokio threadpool at the same time.

Note that if you call rayon from spawn_blocking, then rayon afaik will be using the thread you called rayon on to do some of the work, and delegate other parts to the rayon threadpool. Since spawn_blocking is still inside the Tokio context, this means you will have code that sometimes is inside the context and sometimes isn't.

When I worked at Omnio we converted the code top-down. You can use spawn_blocking to perform blocking operations from the asynchronous code, including waiting for rayon tasks. An alternative would be to use message passing to send back the result when rayon finishes.

In some cases a semaphore will make sense. Other times spawning 4-8 workers and having them fetch jobs from a shared job queue makes sense. Creating a stream of futures (e.g. with iter) and using StreamExt::for_each_concurrent can also work quite nicely.

Regarding join_all: you should watch out for the fact that when joining futures, a wakeup of any future you're joining over wakes up all the futures. To combat this, use spawning or FuturesUnordered.



Is Handle.enter() thread-local or global? If I do:

handle.enter(|| {
    foo.par_iter().for_each(|_| {
        futures::executor::block_on(async {…})

Will block_on that runs potentially on a different thread still be able to find the Handle?

I believe it is a thread-local, hence the challenges I described here:

Note that if you call rayon from spawn_blocking , then rayon afaik will be using the thread you called rayon on to do some of the work, and delegate other parts to the rayon threadpool. Since spawn_blocking is still inside the Tokio context, this means you will have code that sometimes is inside the context and sometimes isn't.

OK, so just to confirm I understand, it would need to be flipped around in that case:

    foo.par_iter().for_each(|_| {
        handle.enter(|| futures::executor::block_on(async {…}) );

(I don't mind if rayon threads get occupied, I'm already wasting them on blocking I/O)

That should definitely work!


I've done the upgrade. Lessons learned:

  • Tokio is really picky with its runtimes and their contexts. I've made a mistake of simplifying previously_saved_handle.enter(|| to Handle::current().enter(|| and that was flaky.

  • Desperate hacks involving Runtime::new() didn't work either. It is necessary to pass a Handle around.

  • Calling of block_on in an async context sometimes is not allowed (I'm not sure about specifics, in some places it works, in some places I keep getting obscure tokio errors). Where I've had async fn calling a regular fn which then tried to block on another async fn, I've had to clean this up and make the whole stack async.

  • There's a bug that breaks lifetimes in impl<'a> Foo<'a> { fn new(…) -> Self }. Writing Foo<'a> instead of Self works around it.

  • Pin-boxing of recursive async functions is annoying (requires explicit lifetimes and Send).

  • futures::stream::iter() is useful for porting iterators with filter_map. Otherwise join_all can replace collect::<Vec<_>>().


The pickiness simply comes from Tokio not storing the runtime in a global. It uses some thread local globals when inside an async function, but if you are in a thread outside the runtime, it has no way of finding it, so Handle::current().enter(|| only works if you are already inside the context, and is thus a no-op when it succeeds (but a panic when not inside the context).

Note that as of Tokio version 0.2.19, we have introduced a block_on method on Handle, so the following is no longer the recommended way to block on futures.

handle.enter(|| futures::executor::block_on(async {...}) );

You should prefer to do this:

handle.block_on(async { ... });
1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.