What is the best way to share data with a CPU-bound tokio task?

I've got some code which follows roughly this pattern:

impl Client {
    fn new() -> Self {
        Client { data: BigData }
    }
    
    async fn run(&self) -> Result<()> {
        // do some async work here...

        // then do some cpu bound work...
        tokio::task::spawn_blocking(|| {
            cpu_bound_task(&self.data);
        }).await?;

        // then do some more async work
    }
}

Playbround link

self holds some data that I'd like to share with a cpu_bound_task (which will run for 10 to 20 seconds) and there's also some async work that is related.

Ideally I'd just mix the CPU bound work directly with the async work, but to not block the async runtime I'm wrapping the work with spawn_blocking. That requres 'static access to the data on self, so I've been wrapping the data in an Arc<T>. That gets the job done, but it seems like more ceremony than I'd like.

Is there a more straight forward way to do this?

Ideally the code would look like this:

    async fn run(&self) -> Result<()> {
        let x = async_work().await?;
        cpu_bound_task(&self.data, x)?;
        self.more_async_work().await?;
    }

I'll be running multiple tasks at once, so ideally I'd like to limit the number of CPU bound tasks running at once to the number of CPU cores I have. Is there a way to set up the Tokio runtime so that it limits the number of threads anyway, without starving other async tasks? Or some other way to arrange this so that it is easy to switch between async code and CPU bound code?

I don’t have a solution here, but can illuminate a bit of the problem:

In the non-async world, spawning threads requires ’static because the compiler can’t guarantee anything about the ordering of the operations on the two threads: the child thread could potentially leak or store anything away forever.

crossbeam’s scoped threads work around this by forcing the child threads to end before the main thread resumes; presumably, it would be possible to implement a similar API for blocking async requests.

1 Like

There are a few approaches:

  1. Clone the data.
  2. Share the data using Arc.
  3. Temporarily take out the data from self and give it by value, then put it back afterwards.
  4. The data has always been owned by a non-async thread, and async just uses message passing to talk to it.
  5. :warning: footgun :warning: Use block_in_place.
3 Likes

@alice just curious, would it be feasible to add an api to tokio that’s effectively a block_in_place but off thread so its not quite as dangerous to async functionality but would allow for borrowing of data. I’m thinking something with a signature like

fn block_off_thread<F, R>(f: F) -> R where F: FnOnce() -> R + Send

It seems like something like that would be useful for this scenario. I'm not sure though if there are other implications to an api like this though.

(Also, there is almost certainly a better name for it)

I was trying to come up with something similar, but I think it'd need to be a macro, so that it can ensure the returned future is actually awaited-- it would be unsafe to forget because it's protecting the lifetime constraints.

Other than that caveat, I suspect this would be safe:

{
    let f = unsafe { /* transmute f to 'static, or whatever other magic crossbeam does */ };
    spawn_blocking(f).await
}

In fact, if you ignore the possibility that someone might poll() the future manually, this is probably safe to just write as an async function: the worker thread won't actually be spawned until the first poll() call, so both awaiting to completion and completely ignoring this Future is safe.


Edit: Thinking about it some more, you've always got the possibility that the next future up the stack gets interrupted and never resumed. That feels like it might be an intractable problem.

That is called a scoped spawn, and no, unfortunately it is impossible in async Rust. It does exist for ordinary threads, e.g. in crossbeam, but it is implemented by calling a function with a closure, and not returning until the threads have all exited. Trying to do this inside asynchronous code will block the thread for the duration of the spawn, which is unacceptable and just as bad as doing the blocking work directly in the thread.

One of the main problems is that a future can be cancelled at any time. E.g. you may have the situation where you are inside the branch of a select! that suddenly cancels you while a task is still running. This doesn't kill the spawned task, hence dangling pointers.

2 Likes

Is something equivalent to what @2e71828 shows where you do spawn blocking in the background and then call await before the function returns doable? Or is that not safe (or no way to make safe)?

It is not safe due to the cancellability of futures, and it is impossible to make safe.

Would making the Future join the spawned thread in its Drop implementation help? Until drop has completed, the compiler has to assume that the object still contains whatever references it held.

I know that using Drop for safety is generally unsound, because keeping a value alive forever is always possible with clever applications of Arc and Mutex; it's the reasoning for mem::forget to not be marked unsafe. That pattern still wouldn't allow a lifetime violation, though, and I don't know whether or not the other means of avoiding Drop in a safe context also provide the same guarantee.

No, Drop would be unsound because the destructor is not guaranteed to be called. Additionally, since destructors are not async, it would essentially involve blocking the async thread until the blocking task exits, and blocking tasks can't be interrupted or otherwise told to exit early in a systematic way.

Well, there are some circumstances where it is guaranteed to be called: RAII guards wouldn't work otherwise. I'm curious about how these (fairly minimal) guarantees interact with the lifetime system, where you may have either inbound or outbound references in play.

This was my intention. It's still a footgun, just a slightly more ergonomic one. The plan was to ensure memory safety by either panicking or deadlocking to prevent the UB.

No, destructors are never guaranteed to be called. You can always mem::forget them away.

I hope this is not possible while there's a live reference to the value being forgotten. Otherwise, we have bigger problems.

It isn't possible to forget a value while it is borrowed, but that isn't the thing you are forgetting in this situation. What you are forgetting is the return value of the fancy spawn function, and that return value is not borrowed, rather the return value borrows from something else.

Right, I think I understand the situation now:

It can be made safe with respect to the task, the same way crossbeam is safe with respect to the main thread. But to be safe overall, you'd need the cooperation of the executor to do something like keep each task inside its own Arc<Mutex<...>> and only try to poll() or cancel a task when it can obtain the lock.

At that point, you've probably eaten away any performance gains you'd get from real async code, and you might as well be using crossbeam and OS threads directly. In any case, this would be a specialty executor, and not tokio's.

I'm not sure what you mean with the Arc<Mutex<...>>.

If the child thread holds an Arc to the root task future and a Mutex lock that the executor would need to call poll(), that should be enough to ensure that the task's memory remains alive and in place for the duration of the child thread, thus ensuring that the internal references remain valid.

Just because the root task stays alive doesn't mean that data inside it will unless you keep it locked for the full duration of the spawn, and if you do that, you get the same footguns as block_in_place.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.