Suggestions for Multithreading Approach Improvements

I'm trying to understand how tokio and rayon could potentially improve a process that I have that has a lot of events. To simulate what's going on in the real application, I've got a function fill_vec that generates some random bytes for processing.

In production, I'd hand these off to a worker process and never expect them back, but for this process, I'm just copying the Vec and letting it duplicate as needed until I get the desired number of iterations.

My observations from the code is that neither Tokio or Rayon add any performance improvements. When I look at a machine running the program with btop, I never see the processors maxed out. This leads me to believe that I've got a critical flaw in my approach. I've run this on both an intel and arm cortex device with the same results.

The playground is attached. Any suggestions on how to improve either approach would be appreciated. If there's another approach that could potentially yield better results, feel free to share that, too. None of the examples I've seen for either library seem difficult, so I may be implementing this in such a way that undermines the optimizations that the libraries try to make.

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::Duration;
use Vec;

use rand::Rng;
use tokio::{task, time::Instant};
use tokio::sync::mpsc;

fn fill_vec(count: u8) -> Vec<u8> {
    let mut rng = rand::thread_rng();
    (0..count).map(|_| rng.gen()).collect()
}

// simulate the actual process, which takes about 2us.
fn process_data(_data: &Vec<u8>) -> bool {
    thread::sleep(Duration::from_micros(2));
    return true;
}

fn do_linear_process(data: &Vec<Vec<u8>>, max_iterations:u32) {
    let mut iteration = 0;
    let iteration_start = Instant::now();
    let mut elements = 0;
    loop {
        iteration += 1;
        if iteration > max_iterations {
            break;
        }
        // simply classify the frame
        let data = data[iteration as usize % 100].clone();
        let val = process_data(&data);
        if val {
            elements += 1;
        }
    }
    let iteration_elapsed = iteration_start.elapsed();
    let time_elements = iteration_elapsed.as_micros() / elements;
    println!("Iterated {} elements in {:?}us at {} us/element (linear)", elements, iteration_elapsed.as_micros(), time_elements);
}

// Rayon approach
fn do_parallel_rayon(data: &Vec<Vec<u8>>, max_iterations: u32) {
    let start_time = Instant::now();
    let elements = (0..max_iterations)
        .map(|i| {
            let chunk = &data[i as usize % 100];
            if process_data(chunk) {
                1
            } else {
                0
            }
        })
        .sum::<u32>();

    let elapsed = start_time.elapsed();
    let time_per_element = elapsed.as_micros() / elements as u128;
    println!(
        "Iterated {} elements in {:?}us at {}us/element (Rayon)",
        elements,
        elapsed.as_micros(),
        time_per_element
    );
}

async fn do_tokio_mpsc(data: &Vec<Vec<u8>>, max_iterations:u32) {

    println!("---- Repeating with tokio mpsc threading ----");
    // now we'll repeat the last part with some threading action...
    let mut iteration = 0;
    let iteration_start = Instant::now();
    let elements = Arc::new(AtomicU64::new(0));
    let (data_tx, mut data_rx) = mpsc::channel::<bool>(2000);
    let (frame_tx, mut frame_rx) = mpsc::channel::<Vec<u8>>(2000);

    // fully decode the frame
    tokio::spawn(async move {
        while let Some(frame) = frame_rx.recv().await {
            let was_successful = process_data(&frame);
            let _ = data_tx.send(was_successful).await;
        }
    });

    let elements_clone = elements.clone();
    let process = || async move {
        while let Some(_) = data_rx.recv().await {
            elements_clone.fetch_add(1, Ordering::SeqCst);
        }
        println!("Finished receiving {} elements with tokio.", elements_clone.load(Ordering::SeqCst));
    };

    let handle = task::spawn(process());

    loop {
        iteration += 1;
        if iteration > max_iterations {
            drop(frame_tx);
            break;
        }
        let data = data[iteration as usize% 100].clone();
        let _ = frame_tx.send(data).await;
    }

    tokio::spawn(async move {
        match handle.await {
            Ok(_) => println!("Tokio task completed successfully."),
            Err(e) => println!("Tokio task failed: {:?}", e),
        }
    });

    let iteration_elapsed = iteration_start.elapsed();
    if elements.load(Ordering::SeqCst) != 0 {
        let time_elements = iteration_elapsed.as_micros() / elements.load(Ordering::SeqCst) as u128;
        println!("Iterated {} elements in {:?}us at {} us/element (Tokio/mpsc)", elements.load(Ordering::SeqCst), iteration_elapsed.as_micros(), time_elements);
    } else {
        println!("Elements is 0.  Time to complete was: {}us -- shouldn't be here...", iteration_elapsed.as_micros());
    }
}


#[tokio::main]
async fn main() {

    // create random data to simulate actual data
    let mut frame_data:Vec<Vec<u8>> = Vec::new();
    for _ in 0..100 {
        frame_data.insert(0,fill_vec(128));
    }

    // maximum iterations for the demo...
    let max_iterations = 100_000;
    let data_arc = Arc::new(frame_data);

    // process the data in a single, linear thread
    let clone_data = data_arc.to_vec();
    let max_iter = max_iterations.clone();

    tokio::task::spawn_blocking(move || {
        do_linear_process(&clone_data, max_iter);
    });

    do_tokio_mpsc(&data_arc, max_iterations).await;

    let clone_data = data_arc.to_vec();
    let max_iter = max_iterations.clone();

    tokio::task::spawn_blocking(move || {
        do_parallel_rayon(&clone_data, max_iter);
    });

}

(Playground)

Output:

---- Repeating with tokio mpsc threading ----
Iterated 100000 elements in 6856170us at 68 us/element (linear)
Iterated 97985 elements in 9190120us at 93 us/element (Tokio/mpsc)
Iterated 100000 elements in 6813661us at 68us/element (Rayon)

I only looked at the rayon one, but... it's not using rayon. So that's your issue. Here's it actually using rayon, but note that how you use rayon depends heavily on the actual interdependencies in the work you're doing.

fn do_parallel_rayon(data: &Vec<Vec<u8>>, max_iterations: u32) {
    use rayon::prelude::*;
    let start_time = Instant::now();
    let elements = (0..max_iterations).into_par_iter()
        .map(|i| {
            let chunk = &data[i as usize % 100];
            if process_data(chunk) {
                1
            } else {
                0
            }
        })
        .sum::<u32>();
    ...
}
1 Like

I see several println!. I think those are called from things that are supposed to run concurrently. Those println! calls are serialized which is likely to interfere with your attempt to run things concurrently.

Gosh -- I've refactored the code so many times that I factored it out the critical step and didn't catch it. I originally had the use rayon::prelude portion at the top of my code -- not sure when I took it out, but it's unfortunate that my incorrect map statement compiled.

Thanks for that. That portion definitely runs quicker now, as I would have expected.

1 Like

The println! calls are only running after each segment finishes, so they should not be affecting the concurrency in this case.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.