Hello Rust developers, I'm new to Rust and my goal was to solve the producer-consumer-like problem. This code should implement the parallel Laplace method. The problem is that it is much slower than the sequential implementation. I found that when I pass 12 as threads_amount
parameter only 1-3 threads were actually running and the other were blocked.
matrix.rs
use std::error;
use std::fmt;
use std::ops::{Index, IndexMut};
pub type MatrixResult<T> = std::result::Result<T, Box<dyn error::Error>>;
#[derive(Debug, Clone)]
pub struct IncorrectDimensions;
impl std::fmt::Display for IncorrectDimensions {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Incorrect dimensions of a matrix")
}
}
impl error::Error for IncorrectDimensions {}
pub struct Matrix {
pub rows: usize,
pub columns: usize,
data: Vec<Vec<f64>>,
}
impl Matrix {
fn create_matrix_data(rows: usize, columns: usize) -> Vec<Vec<f64>> {
let mut data: Vec<Vec<f64>> = Vec::new();
for _ in 0..rows {
data.push(vec![0.0; columns]);
}
data
}
pub fn new(rows: usize, columns: usize) -> Matrix {
let data = Self::create_matrix_data(rows, columns);
Matrix {
rows,
columns,
data,
}
}
pub fn clone(self: &Matrix) -> Matrix {
let mut data: Vec<Vec<f64>> = Vec::new();
for i in 0..self.rows {
data.push(self.data[i].clone());
}
Matrix {
rows: self.rows,
columns: self.columns,
data,
}
}
pub fn print(&self) {
for i in 0..self.rows {
for j in 0..self.columns {
print!("{:10}|", self.data[i][j]);
}
println!("");
}
}
}
impl Index<usize> for Matrix {
type Output = Vec<f64>;
fn index<'a>(&'a self, i: usize) -> &'a Vec<f64> {
&self.data[i]
}
}
impl IndexMut<usize> for Matrix {
fn index_mut<'a>(&'a mut self, i: usize) -> &'a mut Vec<f64> {
&mut self.data[i]
}
}
main.rs
pub mod matrix;
use crate::matrix::Matrix;
use rand::Rng;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::time::SystemTime;
pub struct Node {
pub matrix: Matrix,
pub coefficient: f64,
}
pub struct Unit {
pub result: Mutex<f64>,
pub processing: Mutex<usize>,
pub condvar: (Mutex<Vec<Node>>, Condvar),
}
pub fn laplace_determinant(matrix: &Matrix, level: usize) -> f64 {
let mut det: f64;
if level == 1 {
det = matrix[0][0];
} else if level == 2 {
det = matrix[0][0] * matrix[1][1] - matrix[1][0] * matrix[0][1];
} else {
det = 0.0;
for i in 0..level {
let mut tmp_matrix: Matrix = Matrix::new(level - 1, level - 1);
for j in 1..level {
let mut j2 = 0;
for k in 0..level {
if k == i {
continue;
}
tmp_matrix[j - 1][j2] = matrix[j][k];
j2 += 1;
}
}
det += i32::pow(-1, 1 + i as u32 + 1) as f64
* matrix[0][i]
* laplace_determinant(&tmp_matrix, level - 1);
}
}
det
}
pub fn determinant_parallel_task(unit: Arc<Unit>, id: usize) {
let &(ref mtx, ref cnd) = &(unit.condvar);
loop {
let node: Node;
{
let mut guard = mtx.lock().unwrap();
if guard.is_empty() {
// If stack is empty
if *unit.processing.lock().unwrap() == 0 {
// This is the last non-blocked thread, so kill them all and finish!
cnd.notify_all();
break;
} else {
// Wait for threads that are still running (maybe they chose the push branch)
while guard.is_empty() && *unit.processing.lock().unwrap() != 0 {
guard = cnd.wait(guard).unwrap();
}
if guard.is_empty() {
break;
}
}
}
*unit.processing.lock().unwrap() += 1;
node = guard.pop().unwrap();
}
let matrix = &node.matrix;
let coefficient = node.coefficient;
// let amount = *unit.processing.lock().unwrap();
// println!("running threads amount: {}", amount);
let level = matrix.columns;
if level <= 2 {
// Branch without push
let det;
if level == 1 {
det = matrix[0][0];
} else {
det = matrix[0][0] * matrix[1][1] - matrix[1][0] * matrix[0][1];
}
let result = coefficient * det;
*unit.result.lock().unwrap() += result;
} else {
// Branch with push
for i in 0..level {
let mut tmp_matrix: Matrix = Matrix::new(level - 1, level - 1);
for j in 1..level {
let mut j2 = 0;
for k in 0..level {
if k == i {
continue;
}
let tmp_val = matrix[j][k];
tmp_matrix[j - 1][j2] = tmp_val;
j2 += 1;
}
}
let sign = i32::pow(-1, 1 + i as u32 + 1) as f64;
let element = matrix[0][i];
let new_coefficient = sign * element;
let node = Node {
matrix: tmp_matrix,
coefficient: coefficient * new_coefficient,
};
mtx.lock().unwrap().push(node);
cnd.notify_one();
}
}
*unit.processing.lock().unwrap() -= 1;
}
}
pub fn laplace_determinant_parallel(matrix: &Matrix, threads_amount: usize) -> f64 {
let node = Node {
matrix: matrix.clone(),
coefficient: 1.0,
};
let mut queue: Vec<Node> = Vec::new();
queue.push(node);
let unit = Unit {
result: Mutex::new(0.0),
processing: Mutex::new(0),
condvar: (Mutex::new(queue), Condvar::new()),
};
let unit_ref = Arc::new(unit);
let mut handles: Vec<JoinHandle<()>> = vec![];
for i in 0..threads_amount {
let unit_ref = unit_ref.clone();
let handle = thread::spawn(move || determinant_parallel_task(unit_ref, i));
handles.push(handle);
}
for handle in handles {
_ = handle.join();
// println!("joined");
}
let determinant = *unit_ref.result.lock().unwrap();
determinant
}
const ITERS: usize = 5;
const THREADS_AMOUNT: usize = 8;
fn main() {
let size: usize = 10;
let mut matrix = Matrix::new(size, size);
let mut rng = rand::thread_rng();
for i in 0..size {
for j in 0..size {
matrix[i][j] = (rng.gen::<i32>() >> 24).into();
}
}
let now = SystemTime::now();
for _ in 0..ITERS {
_ = laplace_determinant(&matrix, matrix.columns);
}
println!(
"sequential time: {:.2}s",
now.elapsed().unwrap().as_secs_f64()
);
let now = SystemTime::now();
for _ in 0..ITERS {
_ = laplace_determinant_parallel(&matrix, THREADS_AMOUNT);
}
println!(
"parallel time: {:.2}s",
now.elapsed().unwrap().as_secs_f64()
);
}
Please tell me what I'm doing wrong. If it only works on *nix systems, that's fine with me, I'm running the code on Ubunutu 22.04 with --release flag and level 3 optimization.