How can I send data from tokio to rayon?

pub fn rayon_task() -> Result<String, String> {
    loop {
        // need a way to get json data in this rayon function
        // so that each time the tokio_task() function 
        // sends data, the loop will run again
    }
}

pub async tokio_task() {
    loop {
       // send json to the rayon_task function
    }
}

I have 2 functions started from main, one is utilizing tokio the other rayon.

The tokio tasks are a series of outgoing connections to other servers. So tokio IO bound is perfect for this. These connections are persistent and never close. When data is received from one of these connections, I need to process it. This is incredibly CPU intensive work. I cannot block the tokio connections as the work may take as long as several hours to complete and I need the connections to still remain active and opened.

I am trying to figure out some means of sending the data to the rayon process to handle this. Spawning tokio task to do the work can take as long as 6 to 8 hours, where with rayon I can get that down to between 30 minutes to 2 hours depending on the load.

The problem is I can't figure out a non-blocking way to pass the data between the runtimes. I either run into issues of can't wait for futures or I run into issues of can't drop threads in one runtime from threads of another runtime.

I have tried every combination I can think of but just can't seem to get it to work.

I need suggestions as to what might work.

What exactly have you tried? Have you tried sending the json from tokio to rayon via a simple channel? tokio's mpsc channel for example supports blocking receiving (so you can wait for new messages in a non-async context) while also supporting non-blocking send, so your tokio producers don't get blocked.

2 Likes

As @jofas mentioned, you can use tokio's mpsc channel to communicate from the rayon's thread. Here's a more hands-on article with more info: Async: What is blocking? – Alice Ryhl

If I understand your problem description correctly, executing something through rayon without blocking in tokio might be as simple as

async fn execute_on_rayon_pool<R: Send + 'static>(f: impl FnOnce() -> R + Send + 'static) -> R {
    let (tx, rx) = tokio::sync::oneshot::channel();
    rayon::spawn(|| tx.send(f()).unwrap_or_else(|_| unreachable!()));
    rx.await.unwrap()
}

More properly, you probably also want to notice in case f() panics, so this becomes

use std::panic;
use tokio::sync::oneshot;

async fn execute_on_rayon_pool<R: Send + 'static>(f: impl FnOnce() -> R + Send + 'static) -> R {
    let (tx, rx) = oneshot::channel();
    rayon::spawn(|| {
        tx.send(panic::catch_unwind(panic::AssertUnwindSafe(f)))
            .unwrap_or_else(|_| unreachable!())
    });
    rx.await.unwrap().unwrap_or_else(|caught_panic| panic::resume_unwind(caught_panic))
}

I could also find a crate offering this sort of adapter: tokio_rayon - Rust

Of course, if more back-and-forth is necessary, then tokio’s other channel types also allow blocking and non-blocking interactions. On the tokio side, make sure that you use the non-blocking APIs to channel, on the rayon side, make sure not to fully block threads either (like, with waiting on a channel for instance), but keeping them busy with computations (something that would count as “blocking” in the context of async executors, too) if of course fine and their intended purpose.

3 Likes

I have tried using mpsc standard channels, sync channels and unbound channels, the compiler complains that the sender is sending a future and the receiver cannot await a future in a non-async function.

I have also tried sending via arc mutex, I have tried sending via crossbeam. I have tried directly accessing the function.

the compiler complains that the sender is sending a future and the receiver cannot await a future in a non-async function.

The work to be done on the rayon thread pool needs to be expressed as a function (closure) that does the work directly — not an async function, not a Future, and not even a blocking function (blocking inside the rayon thread pool reduces its capacity to do computation). That's what @steffahn 's example code does; you should use it or something very similar.

I have come to the conclusion rust just can't do what I need. The problem seems to be that I am sending data from an async function in tokio, to a sync function in rayon. There seems to be no way to do this. If it was a 1 off where oneshot would work that would be great, but its not. Its something that needs to happen over and over and over again.

mpsc doesn't work because when you send a message to the rayon runtime from tokio, it won't await the data. Oneshot doesn't work cause its a one time message not something I can send, process and await more data. Like I say I have tried every single method people have recommended and a lot more. At every angle it seems its just not cut out for both async and cpu tasks combined in an efficient way.

Even chatGPT for weeks on end can't make it work and recommends I use a different language so I thought I would ask here first but it seems no real solution exists.

The real problem seems to be that the data comes in form other servers, because of which are tokio bound processes, so there is no way to get the data to a rayon process again and again because rayon will not sit and wait for the information aside from oneshot, which is a single time use. In all other attempts I have found sending over a tokio channel makes it a future and a rayon task can't await a future.

Its disheartening after spending thousands of hours on my project to run face first into the wall but live and learn.

I thank everyone for their advice.

It sounds to me as if you’re conceptually mixing up (purely!) cpu bound tasks with ones that are merely non-async, blocking functionality. Waiting[1] for more data simply isn’t something you should be doing inside of the rayon thread-pool. This has been pointed out multiple times now. The loop that receives the data and moves on to kick of the processing steps for it should really be either running as an ordinary thread, or maybe even an async task. Only the pure computational steps, i.e. only everything up to the point of waiting for more input, should be done inside of rayon, then it returns its intermediate result and is done running again for the moment.

Incidentally, the whole point of rayon is also that you can (and should) spawn lots of small-ish computation tasks on there. A long-running loop communicating with persistent channels in and out is entirely against the intended model of interacting with rayon. It’s designed on purpose such that many small computation tasks can be spawned with super low overhead and scheduled on a pool efficiently; or well… bigger (still, non-blocking, not a loop with a control flow that handles input and output channels) computation tasks can use rayon easily to offer ways of further splitting them up to the rayon “runtime”.


That being said, Rust does not prevent you from doing any of this either. “Rust can’t do X” isn’t the right conclusion here. @kpreid has linked you to the exact method, on tokio’s mpsc channel receiver, that can execute the receiving directly in a non-async context, i.e. in a blocking manner.

yet you claim

Or if this isn’t about receiving but sending, then well… there’s a blocking_send method, too, of course! Pay attention to which ones of them say async fn and which one don’t.

Don’t make the mistake to over-estimate ChatGPT’s knowledge & problem solving ability. I have just tried a handful of queries myself to get a picture, combining parts of your answers for context and asking for help, but the feedback indeed seemed largely useless and often misleading to me; and assuming your reaction to such feedback/ideas from ChatGPT would have (rightfully) been “this doesn’t work because …” a bunch of times, it would just believe you. If you tell it “I couldn’t get this to work, there might be no way, should I try a different language”, these language models will always tell you “yes”. Unless it’s the most straightforward&obvious kinds of facts you’re questioning they almost never would talk back and insist that something is possible even if that’s the truth.


  1. you say “await”, so I assume you mean “wait”, not just referring to await keyword and actually mean pulling from a channel that’s known to always have data available immediately ↩︎

7 Likes