Comparison of a simple code between Rust and C!

Can you provide your complete program, so I can experiment myself?

Sure, here it is:

extern crate chrono;
extern crate flexi_logger;
#[macro_use]
extern crate log;
extern crate rayon;

use chrono::Local;
use rayon::prelude::*;

use std::sync::Arc;
use std::thread;
static N_THREADS: usize = 4;
static ELEMENTS: usize = 15_999_999_999;

fn main() {
    flexi_logger::Logger::with_env_or_str("info")
        .format(mini)
        .start()
        .unwrap();
    let mut data = vec![1u8; ELEMENTS];
    data[ELEMENTS - 1] = 123;
    data[777] = 225;

    let data = Arc::new(data);
    assert_eq!(data[77], 1, "1");
    assert_eq!(data[777], 225, "2");
    assert_eq!(data[ELEMENTS - 2], 1, "3");
    assert_eq!(data[ELEMENTS - 1], 123, "4");

    let start = Local::now();
    let m = single_threaded(data.clone());
    debug!("Max: {}", m);
    info!(
        "single_threaded version: {} ms",
        (Local::now() - start).num_milliseconds()
    );

    let start = Local::now();
    let m = single_threaded_max(data.clone());
    debug!("Max: {}", m);
    info!(
        "single_threaded_max version: {} ms",
        (Local::now() - start).num_milliseconds()
    );

    let start = Local::now();
    let m = rayon_based(data.clone());
    debug!("Max: {}", m);
    info!(
        "rayon_based version: {} ms",
        (Local::now() - start).num_milliseconds()
    );

    let start = Local::now();
    let m = rayon_based_max(data.clone());
    debug!("Max: {}", m);
    info!(
        "rayon_based_max version: {} ms",
        (Local::now() - start).num_milliseconds()
    );

    let start = Local::now();
    let m = rayon_based_tuned(data.clone());
    debug!("Max: {}", m);
    info!(
        "rayon_based_tuned version: {} ms",
        (Local::now() - start).num_milliseconds()
    );

    let start = Local::now();
    let m = improved_original(data.clone());
    debug!("Max: {}", m);
    info!(
        "improved_original version: {} ms",
        (Local::now() - start).num_milliseconds()
    );

    let start = Local::now();
    let m = original(data.clone());
    debug!("Max: {}", m);
    info!(
        "Original version: {} ms",
        (Local::now() - start).num_milliseconds()
    );
}

fn single_threaded(data: Arc<Vec<u8>>) -> u8 {
    data.iter()
        .fold(std::u8::MIN, |m, elem| std::cmp::max(m, *elem))
}

fn single_threaded_max(data: Arc<Vec<u8>>) -> u8 {
    data.iter().max().unwrap().clone()
}

fn rayon_based(data: Arc<Vec<u8>>) -> u8 {
    data.par_iter()
        .reduce(|| &std::u8::MIN, |a, b| std::cmp::max(a, b))
        .clone()
}

fn rayon_based_max(data: Arc<Vec<u8>>) -> u8 {
    data.par_iter().max().unwrap().clone()
}

fn rayon_based_tuned(data: Arc<Vec<u8>>) -> u8 {
    data.par_iter()
        .with_min_len(1_000_000)
        .reduce(|| &std::u8::MIN, |a, b| std::cmp::max(a, b))
        .clone()
}

fn improved_original(data: Arc<Vec<u8>>) -> u8 {
    //------------------------------------------------
    let mut threads = vec![];
    for i in 0..N_THREADS {
        let v = data.clone();
        threads.push(thread::spawn(move || {
            let low = i * (ELEMENTS / N_THREADS);
            let high = if (i == N_THREADS - 1) && ((ELEMENTS % N_THREADS) != 0) {
                ELEMENTS
            } else {
                low + ELEMENTS / N_THREADS
            };

            v[low..high]
                .iter()
                .fold(std::u8::MIN, |m, elem| std::cmp::max(m, *elem))
        }));
    }
    threads
        .into_iter()
        .map(|t| t.join().unwrap())
        .max()
        .unwrap()
}

fn original(data: Arc<Vec<u8>>) -> u8 {
    //------------------------------------------------
    let mut threads = vec![];
    for i in 0..N_THREADS {
        let v = data.clone();
        threads.push(thread::spawn(move || {
            let low = i * (ELEMENTS / N_THREADS);
            let high = if (i == N_THREADS - 1) && ((ELEMENTS % N_THREADS) != 0) {
                ELEMENTS
            } else {
                low + ELEMENTS / N_THREADS
            };

            let mut menor = v[low];
            for k in (low + 1)..high {
                if menor <= v[k] {
                    menor = v[k];
                }
            }
            menor
        }));
    }
    threads
        .into_iter()
        .map(|t| t.join().unwrap())
        .max()
        .unwrap()
}

use log::Record;
use std::io;

fn mini(w: &mut io::Write, record: &Record) -> Result<(), io::Error> {
    write!(w, "[{}] {}", record.level(), &record.args())
}
3 Likes

faster (but with newer github master) might be a good target to try. (simd direct is far from simple. :wink: )

To justify rayon, reduce implies serial execution in many context. For example in the term "MapReduce" the "Map" part convert original data to cheaply composable pieces in parrellal, and the "Reduce" part actually compose them in serial.

So this code should be fair

fn rayon_based_fair(data: &[u8]) -> u8 {
    let slices: Vec<_> = data.iter().chunks(ELEMENTS / N_THREADS).collect();
    slices.par_iter().map(|s| s.iter().max().unwrap()).max().unwrap()
}

I put #[inline(never)] at the entry function of each of your tests, just to be sure things are separate. I also initialized the threadpool manually to match your N_THREADS like this:

    rayon::ThreadPoolBuilder::new()
        .num_threads(N_THREADS)
        .build_global()
        .unwrap();

My initial run of your code on an i7-7700k got these results:

[INFO] single_threaded version: 579 ms
[INFO] single_threaded_max version: 3836 ms
[INFO] rayon_based version: 963 ms
[INFO] rayon_based_max version: 1594 ms
[INFO] rayon_based_tuned version: 931 ms
[INFO] improved_original version: 398 ms
[INFO] Original version: 1831 ms

Next I changed single_threaded_max to the following, and now it performs just as well as single_threaded, both around 580ms.

fn single_threaded_max(data: Arc<Vec<u8>>) -> u8 {
    data.iter().cloned().max().unwrap_or(std::u8::MIN)
}

A similar change to rayon_based brings it down to 403ms, within noise of your improved_original. Neither are great for 4 threads compared to single_threaded, but this test is actually constrained on memory bandwidth anyway, using a 16GB data vector. 16GB/400ms = 40GB/s = 37.25GiB/s

fn rayon_based(data: Arc<Vec<u8>>) -> u8 {
    data.par_iter()
        .cloned()
        .reduce(|| std::u8::MIN, |a, b| std::cmp::max(a, b))
}

The same tweak on rayon_based_tuned also gets around 400ms. I'm actually relieved that tuning is apparently not much of a problem here.

rayon_based_max is disappointing. Making the same change as single_threaded like data.par_iter().cloned().max().unwrap_or(std::u8::MIN) actually makes it much worse!

ParallelIterator::max() is implemented in terms of reduce_with, which we sort of discourage in general use since it's heavy on Option wrapping. I'm going to play with this in Rayon and see if I can do better with a more direct implementation.

@Hyeonu - there's also par_chunks, so you don't need an intermediate slices collection. I added it to the tests like the following, and it also completes in around 400ms.

fn rayon_based_chunks(data: Arc<Vec<u8>>) -> u8 {
    data.par_chunks(ELEMENTS / N_THREADS)
        .map(|slice| slice.iter().cloned().max().unwrap_or(std::u8::MIN))
        .max()
        .unwrap_or(std::u8::MIN)
}
3 Likes