I'm trying to understand how tokio and rayon could potentially improve a process that I have that has a lot of events. To simulate what's going on in the real application, I've got a function fill_vec
that generates some random bytes for processing.
In production, I'd hand these off to a worker process and never expect them back, but for this process, I'm just copying the Vec and letting it duplicate as needed until I get the desired number of iterations.
My observations from the code is that neither Tokio or Rayon add any performance improvements. When I look at a machine running the program with btop, I never see the processors maxed out. This leads me to believe that I've got a critical flaw in my approach. I've run this on both an intel and arm cortex device with the same results.
The playground is attached. Any suggestions on how to improve either approach would be appreciated. If there's another approach that could potentially yield better results, feel free to share that, too. None of the examples I've seen for either library seem difficult, so I may be implementing this in such a way that undermines the optimizations that the libraries try to make.
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::Duration;
use Vec;
use rand::Rng;
use tokio::{task, time::Instant};
use tokio::sync::mpsc;
fn fill_vec(count: u8) -> Vec<u8> {
let mut rng = rand::thread_rng();
(0..count).map(|_| rng.gen()).collect()
}
// simulate the actual process, which takes about 2us.
fn process_data(_data: &Vec<u8>) -> bool {
thread::sleep(Duration::from_micros(2));
return true;
}
fn do_linear_process(data: &Vec<Vec<u8>>, max_iterations:u32) {
let mut iteration = 0;
let iteration_start = Instant::now();
let mut elements = 0;
loop {
iteration += 1;
if iteration > max_iterations {
break;
}
// simply classify the frame
let data = data[iteration as usize % 100].clone();
let val = process_data(&data);
if val {
elements += 1;
}
}
let iteration_elapsed = iteration_start.elapsed();
let time_elements = iteration_elapsed.as_micros() / elements;
println!("Iterated {} elements in {:?}us at {} us/element (linear)", elements, iteration_elapsed.as_micros(), time_elements);
}
// Rayon approach
fn do_parallel_rayon(data: &Vec<Vec<u8>>, max_iterations: u32) {
let start_time = Instant::now();
let elements = (0..max_iterations)
.map(|i| {
let chunk = &data[i as usize % 100];
if process_data(chunk) {
1
} else {
0
}
})
.sum::<u32>();
let elapsed = start_time.elapsed();
let time_per_element = elapsed.as_micros() / elements as u128;
println!(
"Iterated {} elements in {:?}us at {}us/element (Rayon)",
elements,
elapsed.as_micros(),
time_per_element
);
}
async fn do_tokio_mpsc(data: &Vec<Vec<u8>>, max_iterations:u32) {
println!("---- Repeating with tokio mpsc threading ----");
// now we'll repeat the last part with some threading action...
let mut iteration = 0;
let iteration_start = Instant::now();
let elements = Arc::new(AtomicU64::new(0));
let (data_tx, mut data_rx) = mpsc::channel::<bool>(2000);
let (frame_tx, mut frame_rx) = mpsc::channel::<Vec<u8>>(2000);
// fully decode the frame
tokio::spawn(async move {
while let Some(frame) = frame_rx.recv().await {
let was_successful = process_data(&frame);
let _ = data_tx.send(was_successful).await;
}
});
let elements_clone = elements.clone();
let process = || async move {
while let Some(_) = data_rx.recv().await {
elements_clone.fetch_add(1, Ordering::SeqCst);
}
println!("Finished receiving {} elements with tokio.", elements_clone.load(Ordering::SeqCst));
};
let handle = task::spawn(process());
loop {
iteration += 1;
if iteration > max_iterations {
drop(frame_tx);
break;
}
let data = data[iteration as usize% 100].clone();
let _ = frame_tx.send(data).await;
}
tokio::spawn(async move {
match handle.await {
Ok(_) => println!("Tokio task completed successfully."),
Err(e) => println!("Tokio task failed: {:?}", e),
}
});
let iteration_elapsed = iteration_start.elapsed();
if elements.load(Ordering::SeqCst) != 0 {
let time_elements = iteration_elapsed.as_micros() / elements.load(Ordering::SeqCst) as u128;
println!("Iterated {} elements in {:?}us at {} us/element (Tokio/mpsc)", elements.load(Ordering::SeqCst), iteration_elapsed.as_micros(), time_elements);
} else {
println!("Elements is 0. Time to complete was: {}us -- shouldn't be here...", iteration_elapsed.as_micros());
}
}
#[tokio::main]
async fn main() {
// create random data to simulate actual data
let mut frame_data:Vec<Vec<u8>> = Vec::new();
for _ in 0..100 {
frame_data.insert(0,fill_vec(128));
}
// maximum iterations for the demo...
let max_iterations = 100_000;
let data_arc = Arc::new(frame_data);
// process the data in a single, linear thread
let clone_data = data_arc.to_vec();
let max_iter = max_iterations.clone();
tokio::task::spawn_blocking(move || {
do_linear_process(&clone_data, max_iter);
});
do_tokio_mpsc(&data_arc, max_iterations).await;
let clone_data = data_arc.to_vec();
let max_iter = max_iterations.clone();
tokio::task::spawn_blocking(move || {
do_parallel_rayon(&clone_data, max_iter);
});
}
Output:
---- Repeating with tokio mpsc threading ----
Iterated 100000 elements in 6856170us at 68 us/element (linear)
Iterated 97985 elements in 9190120us at 93 us/element (Tokio/mpsc)
Iterated 100000 elements in 6813661us at 68us/element (Rayon)