From Rayon to Tokyo

Please consider this exercise:

use rayon::prelude::*;
fn rosza(m: u32, n: u32) -> u32 {
    match (m, n) {
        (0, n) => n + 1,
        (m, 0) => rosza(m - 1, 1),
        (m, n) => rosza(m - 1, rosza(m, n - 1)),
    }
}
async fn reckon() {
    rayon::spawn(move || {
        (1..=10).into_par_iter().for_each(|_| {
            let result = rosza(3, 5);
            let idx = rayon::current_thread_index().unwrap();
            println!("{} {}", idx, result);
        });
    });
}
#[tokio::main]
async fn main() {
    println!("{:?}",reckon().await);
}

This performs not so bad and it can be seen nicely how it is spread over the pool:

()                                                                                            
0 253                                                                                         
5 253                                                                                         
1 253                                                                                         
0 253                                                                                         
5 253                                                                                         
1 253                                                                                         
0 253                                                                                         
4 253                                                                                         
5 253                                                                                         
0 253  

What i couldn’t manage yet is to send the result over a channel back to Tokyo.
I tried desperately to guess - without success so far:

// let (tx, mut rx) = mpsc::channel::<u32>(10);                                                  
// tx.send(result);                                                                              
// while let Some(thing) = rx.recv().await {                                                     
//    println!("{}", thing);                                                                     
// }                                                                                             

This and some other unfortunate variants yield None.

Thanks for any hint.

P.S.: I wanted something similar like in the blog from Alice:

async fn psum_iter(nums: Vec<i32>) -> i32 {
    let (tx, rx) = tokio::sync::oneshot::channel();
    rayon::spawn(move || {
        let sum = nums.par_iter().sum();
        let _ = tx.send(sum);
    });
    rx.await.expect("Panic in rayon::spawn")

This works as expected for me:

async fn reckon() {
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
    rayon::spawn(move || {
        (1..=10).into_par_iter().for_each(|_| {
            let result = rosza(3, 5);
            let idx = rayon::current_thread_index().unwrap();
            println!("{} {}", idx, result);
            let _ = tx.send(result);
        });
    });
    while let Some(thing) = rx.recv().await {                                                     
       println!("receive {}", thing);                                                                     
    }
}

I didn’t change reckon() to actually return a value because it isn't clear what you want to do with the 10 values.

If this doesn’t help, please provide complete but non-working code (I wasn’t sure where you were planning to put the code you provided in comments) and describe in more detail what behavior you want it to have.

I'm trying to figure out what I did:

It must be let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); ,
not let (tx, mut rx) = mpsc::channel::<u32>(10); like in my example. The latter yields None. Holy Joe!

Of course, the function should return the value - but that's not a problem now.

Thank you very much.

P.S. For completeness the working version:

async fn reckon() -> Vec<u32> {
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
    // let (tx, mut rx) = mpsc::channel::<u32>(10);                                               
    rayon::spawn(move || {
        (1..=10).into_par_iter().for_each(|_| {
            let result = rosza(3, 13);
            // let idx = rayon::current_thread_index().unwrap();                                 
            let _ = tx.send(result);
        });
    });
    let mut result = Vec::<u32>::new();
    while let Some(thing) = rx.recv().await {
        // println!("receive {}", thing);                                                        
        result.push(thing);
    }
    result
}

With rosza(3, 13) (see also):

karl@rantanplan:~/src/rust/playground/gizmo$ time ../target/release/scratchpad
[gizmo/src/bin/scratchpad.rs:48:5] result = [
    65533,
    65533,
    65533,
    65533,
    65533,
    65533,
    65533,
    65533,
    65533,
    65533,
]

real    0m5.625s
user    0m28.641s
sys     0m0.013s

If you use mpsc::channel(), then the sender's send() method is async, so it doesn't do anything unless awaited. You should have gotten a warning about the unused future. Don’t ignore warnings while you are learning!

You could use Sender::blocking_send() in that case, but it's not good to block inside of a Rayon context — if nothing else, it occupies capacity of the thread pool and thus reduces the total possible throughput, and it can also cause deadlocks in some cases. It’s much better to use an unbounded channel in this scenario (where the amount of output will be bounded by the input).

2 Likes

Yes sure, that is right. But unfortunately there was no warning - I just tried to reproduce it. I first thought that it has gone down between some warnings because of unused imports and so one…but there was none.