My parallel code is slow

Hello Rust developers, I'm new to Rust and my goal was to solve the producer-consumer-like problem. This code should implement the parallel Laplace method. The problem is that it is much slower than the sequential implementation. I found that when I pass 12 as threads_amount parameter only 1-3 threads were actually running and the other were blocked.

matrix.rs

use std::error;
use std::fmt;
use std::ops::{Index, IndexMut};

pub type MatrixResult<T> = std::result::Result<T, Box<dyn error::Error>>;

#[derive(Debug, Clone)]
pub struct IncorrectDimensions;

impl std::fmt::Display for IncorrectDimensions {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "Incorrect dimensions of a matrix")
    }
}

impl error::Error for IncorrectDimensions {}

pub struct Matrix {
    pub rows: usize,
    pub columns: usize,
    data: Vec<Vec<f64>>,
}

impl Matrix {
    fn create_matrix_data(rows: usize, columns: usize) -> Vec<Vec<f64>> {
        let mut data: Vec<Vec<f64>> = Vec::new();

        for _ in 0..rows {
            data.push(vec![0.0; columns]);
        }

        data
    }

    pub fn new(rows: usize, columns: usize) -> Matrix {
        let data = Self::create_matrix_data(rows, columns);

        Matrix {
            rows,
            columns,
            data,
        }
    }

    pub fn clone(self: &Matrix) -> Matrix {
        let mut data: Vec<Vec<f64>> = Vec::new();

        for i in 0..self.rows {
            data.push(self.data[i].clone());
        }

        Matrix {
            rows: self.rows,
            columns: self.columns,
            data,
        }
    }

    pub fn print(&self) {
        for i in 0..self.rows {
            for j in 0..self.columns {
                print!("{:10}|", self.data[i][j]);
            }
            println!("");
        }
    }
}

impl Index<usize> for Matrix {
    type Output = Vec<f64>;
    fn index<'a>(&'a self, i: usize) -> &'a Vec<f64> {
        &self.data[i]
    }
}

impl IndexMut<usize> for Matrix {
    fn index_mut<'a>(&'a mut self, i: usize) -> &'a mut Vec<f64> {
        &mut self.data[i]
    }
}

main.rs

pub mod matrix;
use crate::matrix::Matrix;
use rand::Rng;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::time::SystemTime;

pub struct Node {
    pub matrix: Matrix,
    pub coefficient: f64,
}

pub struct Unit {
    pub result: Mutex<f64>,
    pub processing: Mutex<usize>,
    pub condvar: (Mutex<Vec<Node>>, Condvar),
}

pub fn laplace_determinant(matrix: &Matrix, level: usize) -> f64 {
    let mut det: f64;

    if level == 1 {
        det = matrix[0][0];
    } else if level == 2 {
        det = matrix[0][0] * matrix[1][1] - matrix[1][0] * matrix[0][1];
    } else {
        det = 0.0;
        for i in 0..level {
            let mut tmp_matrix: Matrix = Matrix::new(level - 1, level - 1);
            for j in 1..level {
                let mut j2 = 0;
                for k in 0..level {
                    if k == i {
                        continue;
                    }
                    tmp_matrix[j - 1][j2] = matrix[j][k];
                    j2 += 1;
                }
            }
            det += i32::pow(-1, 1 + i as u32 + 1) as f64
                * matrix[0][i]
                * laplace_determinant(&tmp_matrix, level - 1);
        }
    }

    det
}

pub fn determinant_parallel_task(unit: Arc<Unit>, id: usize) {
    let &(ref mtx, ref cnd) = &(unit.condvar);
    loop {
        let node: Node;
        {
            let mut guard = mtx.lock().unwrap();
            if guard.is_empty() {
                // If stack is empty
                if *unit.processing.lock().unwrap() == 0 {
                    // This is the last non-blocked thread, so kill them all and finish!
                    cnd.notify_all();
                    break;
                } else {
                    // Wait for threads that are still running (maybe they chose the push branch)
                    while guard.is_empty() && *unit.processing.lock().unwrap() != 0 {
                        guard = cnd.wait(guard).unwrap();
                    }
                    if guard.is_empty() {
                        break;
                    }
                }
            }

            *unit.processing.lock().unwrap() += 1;
            node = guard.pop().unwrap();
        }
        let matrix = &node.matrix;
        let coefficient = node.coefficient;

        // let amount = *unit.processing.lock().unwrap();
        // println!("running threads amount: {}", amount);

        let level = matrix.columns;
        if level <= 2 {
            // Branch without push
            let det;
            if level == 1 {
                det = matrix[0][0];
            } else {
                det = matrix[0][0] * matrix[1][1] - matrix[1][0] * matrix[0][1];
            }
            let result = coefficient * det;
            *unit.result.lock().unwrap() += result;
        } else {
            // Branch with push
            for i in 0..level {
                let mut tmp_matrix: Matrix = Matrix::new(level - 1, level - 1);
                for j in 1..level {
                    let mut j2 = 0;
                    for k in 0..level {
                        if k == i {
                            continue;
                        }
                        let tmp_val = matrix[j][k];
                        tmp_matrix[j - 1][j2] = tmp_val;
                        j2 += 1;
                    }
                }
                let sign = i32::pow(-1, 1 + i as u32 + 1) as f64;
                let element = matrix[0][i];
                let new_coefficient = sign * element;
                let node = Node {
                    matrix: tmp_matrix,
                    coefficient: coefficient * new_coefficient,
                };

                mtx.lock().unwrap().push(node);
                cnd.notify_one();
            }
        }
        *unit.processing.lock().unwrap() -= 1;
    }
}

pub fn laplace_determinant_parallel(matrix: &Matrix, threads_amount: usize) -> f64 {
    let node = Node {
        matrix: matrix.clone(),
        coefficient: 1.0,
    };
    let mut queue: Vec<Node> = Vec::new();
    queue.push(node);
    let unit = Unit {
        result: Mutex::new(0.0),
        processing: Mutex::new(0),
        condvar: (Mutex::new(queue), Condvar::new()),
    };
    let unit_ref = Arc::new(unit);

    let mut handles: Vec<JoinHandle<()>> = vec![];
    for i in 0..threads_amount {
        let unit_ref = unit_ref.clone();
        let handle = thread::spawn(move || determinant_parallel_task(unit_ref, i));
        handles.push(handle);
    }
    for handle in handles {
        _ = handle.join();
        // println!("joined");
    }

    let determinant = *unit_ref.result.lock().unwrap();
    determinant
}

const ITERS: usize = 5;
const THREADS_AMOUNT: usize = 8;

fn main() {
    let size: usize = 10;

    let mut matrix = Matrix::new(size, size);

    let mut rng = rand::thread_rng();
    for i in 0..size {
        for j in 0..size {
            matrix[i][j] = (rng.gen::<i32>() >> 24).into();
        }
    }

    let now = SystemTime::now();
    for _ in 0..ITERS {
        _ = laplace_determinant(&matrix, matrix.columns);
    }
    println!(
        "sequential time: {:.2}s",
        now.elapsed().unwrap().as_secs_f64()
    );

    let now = SystemTime::now();
    for _ in 0..ITERS {
        _ = laplace_determinant_parallel(&matrix, THREADS_AMOUNT);
    }
    println!(
        "parallel time: {:.2}s",
        now.elapsed().unwrap().as_secs_f64()
    );
}

Please tell me what I'm doing wrong. If it only works on *nix systems, that's fine with me, I'm running the code on Ubunutu 22.04 with --release flag and level 3 optimization.

Welcome!

There's not enough code here for me to reproduce/debug the problem myself, but one thing I noticed is this code:

Condvar::wait is subject to spurious wakeups. If I’m understanding the code correctly, I think it’s possible for a spurious wakeup here to cause a thread to break out of the loop early without doing any work.

1 Like

Thank you for your reply, I've updated the post with a working example. I think all threads are running now, but it's still slower than sequential. Could there be a problem in creating an instance of Node? Maybe it's just because stack operations are more efficient than heap operations, but parallel execution takes 9 seconds and sequential execution only takes 3, which is kind of weird?

To those interested here is a playground based on OPs code[1] to experiment with.

I haven't fully grasped the algorithm, so I can't say where the blocking is happening that is causing the delay.

Tangentially to the problem of why the multithreaded algorithm is slower, using Vec<Vec<f64>> to represent a matrix is not really what you want to do from a performance perspective. This is basically a list with pointers to each row, the rows potentially being scattered all across your memory, destroying cache locality. You'd either want to represent your matrix as a Vec<f64> with ROWS * COLUMNS many elements and index the matrix as column * COLUMNS + row , where row is your row index and column your column index, or use something like ndarray::Array2, which does this under the hood for you.


  1. I used thread::scope instead of joining JoinHandles manually ↩︎

1 Like

Thanks for the suggestion. Here I have used ndarrays [Rust Playground].
But even so, the parallel version is still about 3-4x slower. Maybe there are implicit copies somwhere?

I just checked and if you run your parallel code with a single thread, it also takes like 4 seconds where the single threaded version takes 1 1/2 seconds. Are you sure your algorithm is correct? I put a print statement inside the loop and the loop is executed a lot! Is that supposed to be the case?

3 Likes

You can delete pub result: Mutex<f64>,
and use the threads return value (or just move each threads additions to a total sent after loop.)
Not tried or looked at how much the condition is called (so maybe not impactful.)

Yes, I think it's correct because it gives an accurate answer on small matrices, despite choking on large matrices (I assume due to floating point errors).
But the number of loops is the same as the number of function calls [Rust Playground]
Although Laplace expansion is not a good approach for finding the determinant in general, it has O(n!) complexity.
Also thanks for the advice, John.

1 Like

I very naively timed the places where you synchronize and these alone take up more time than your single threaded solution, even though we have zero contention with just a single thread. Playground. There's still a lot of time unaccounted for though.

It's kind of hard to give advice for this reason.

It looks like you are recursing all the way down to 2 by 2 matrices. If you stop the recursion at a higher level you may see improved performance. If you have, say, 10 threads, you can just pass submatrix to each thread.

2 Likes

Yes, I think you and jofas are both right, too much time is wasted by the operating system, so If I want to shorten execution time, I will have to reduce the number of syscalls.
Now it seems to me that the implementation of the algorithm is bad, maybe it is possible to do without a buffer with a mutex at all, but most often the parallel version works a little faster than the sequential version, and this is exactly what I need for the lab report.
If anyone is interested, here's an example full of magic numbers, as if this is the best correlation I've found between the number of threads and the threshold at which it stops using the buffer in favor of a sequential algorithm [Rust Playground].
Thanks again everyone for your help!

Example using local chunks playground (note playground does not have 8 multiple processes.)

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.