Why multithreaded approach is slower than single-threaded straightforward approach?

I was trying this programming puzzle in Rust:

Implement a parallel valid parenthesis checker

I am using multithreading here to simulate the parallel processing
Here is my approach:

  • Keep T threads ready to check a given chunk
  • Chunk batch: sending T chunks to T threads
  • For each of the T threads, chunk_receiver and result_sender are moved inside it and chunk_sender and result_receiver and owned by main thread
  • Receive results for a batch (parallel execution)
  • Combine results sequentially

I have tested the code and it is validated for all major test cases
But when I tried to benchmark the multithreaded solution with the traditional one, I noticed it is too slow as compared to the latter.

I am unclear about...

  1. I wanted to know why there isn't much performance increase?
  2. Do the parameters like CHUNK_SIZE and NUM_THREADS have a major role in such benchmarking?
  3. Is there any better approach to parallelize the solution to this problem?
  4. When I ran the binary, I noticed the workload of each CPU core using htop, and it was not utilizing the cores simultaneously. The workload was swapping, but only one core was maxing out at a moment. Is this the expected behavior for multi-threaded applications?

Edit: I used cargo build --release to build the project and there is hardly any performance increase as compared to the basic brute approach

Any help is appreciated. Thanks!

Here is the playground link or code below

use rand::Rng;
use std::{
    sync::mpsc::{channel, Receiver, Sender},
    thread::{self, JoinHandle},
    time::Instant,
};

struct ThreadInfo {
    thread: JoinHandle<()>,
    chunk_sender: Sender<String>,
    result_receiver: Receiver<ChunkResult>,
}

struct ChunkResult {
    balance: i64,
    min_balance: i64,
}

fn process_chunk(chunk: String) -> ChunkResult {
    let mut balance = 0;
    let mut min_balance: i64 = i64::max_value();

    for c in chunk.chars() {
        match c {
            '(' => balance += 1,
            ')' => balance -= 1,
            _ => {
                panic!("invalid parenthesis given as input")
            }
        }

        if balance < min_balance {
            min_balance = balance;
        }
    }

    ChunkResult {
        balance,
        min_balance,
    }
}

fn worker_thread(chunk_receiver: Receiver<String>, result_sender: Sender<ChunkResult>) {
    for chunk in chunk_receiver {
        result_sender.send(process_chunk(chunk)).unwrap();
    }
}

fn check_valid_parentheses(input: &str) -> bool {
    const CHUNK_SIZE: usize = 1000000; // Adjust the chunk size as needed
    const NUM_THREADS: usize = 10; // Adjust the number of threads as needed

    let chunks: Vec<_> = input.chars().collect();

    let threads: Vec<ThreadInfo> = (0..NUM_THREADS)
        .map(|_| {
            let (chunk_sender, chunk_receiver): (Sender<String>, Receiver<String>) = channel();
            let (result_sender, result_receiver): (Sender<ChunkResult>, Receiver<ChunkResult>) =
                channel();

            ThreadInfo {
                thread: thread::spawn(move || {
                    worker_thread(chunk_receiver, result_sender);
                }),
                chunk_sender,
                result_receiver,
            }
        })
        .collect();

    let mut global_balance: i64 = 0;
    let mut global_min_balance: i64 = i64::max_value();

    for chunk_batch in chunks.chunks(CHUNK_SIZE * NUM_THREADS) {
        let mut batch_size = 0;
        chunk_batch
            .chunks(CHUNK_SIZE)
            .map(|chunk| chunk.iter().collect::<String>())
            .enumerate()
            .for_each(|(id, chunk)| {
                batch_size += 1;
                threads[id].chunk_sender.send(chunk).unwrap();
            });
        for i in 0..batch_size {
            // preserving the same order in which we sent the chunks
            let chunk_result = threads[i].result_receiver.recv().unwrap();
            global_min_balance = global_min_balance.min(global_balance + chunk_result.min_balance);
            global_balance += chunk_result.balance;
        }
    }

    for thread_info in threads {
        // Notify worker threads that no more chunks will be sent
        drop(thread_info.chunk_sender);
        thread_info.thread.join().unwrap();
    }

    global_balance == 0 && global_min_balance >= 0
}
fn main() {
    for id in 0..3 {
        let s = generate_random_parentheses();

        let start_time = Instant::now();
        let correct = brute_checker(&s);
        let brute_time = start_time.elapsed();

        let start_time = Instant::now();
        let solution = check_valid_parentheses(&s);
        let my_time = start_time.elapsed();

        println!("Brute: {:?}, My: {:?}", brute_time, my_time);
        assert_eq!(solution, correct);
        println!("OK test {}", id);
    }
}

fn brute_checker(s: &str) -> bool {
    let (mut bal, mut min_bal) = (0, i64::MAX);
    for ch in s.chars() {
        if ch == '(' {
            bal += 1;
        } else if ch == ')' {
            bal -= 1;
        } else {
            panic!("bad input for string");
        }
        min_bal = min_bal.min(bal);
    }
    bal == 0 && min_bal >= 0
}

fn generate_random_parentheses() -> String {
    let mut rng = rand::thread_rng();
    let mut parentheses = String::new();
    let mut balance = 0;
    let length = 10000000;

    while parentheses.len() < length {
        let random_char = if balance > 0 && rng.gen::<f64>() < 0.5 {
            balance -= 1;
            ')'
        } else {
            balance += 1;
            '('
        };

        parentheses.push(random_char);
    }

    while balance > 0 {
        parentheses.push(')');
        balance -= 1;
    }

    parentheses
}

#[cfg(test)]
mod tests {
    use crate::{check_valid_parentheses, generate_random_parentheses};

    #[test]
    fn test_valid_parentheses() {
        let tests = [
            ("()()()()()()()()()()()()", true),
            ("(((((((((((((((((((((((((())))))))))))))))))))))))))", true),
            ("((((((()))))))(((((((())))))))(((((((())))))))", true),
        ];
        for (s, answer) in tests {
            let result = check_valid_parentheses(s);
            assert_eq!(answer, result);
        }
    }

    #[test]
    fn test_invalid_parentheses() {
        let tests = [
            ("))))(((())))", false),
            ("((()))((", false),
            ("())))", false),
        ];
        for (s, answer) in tests {
            let result = check_valid_parentheses(s);
            assert_eq!(answer, result);
        }
    }

    #[test]
    fn test_edge_cases() {
        let tests = [("", true), ("(", false), ("((()))", true)];
        for (s, answer) in tests {
            let result = check_valid_parentheses(s);
            assert_eq!(answer, result);
        }
    }

    #[test]
    fn test_large_input_valid() {
        let mut s = "(".repeat(10000);
        s.push_str(&")".repeat(10000));
        assert_eq!(check_valid_parentheses(s.as_str()), true);
    }

    #[test]
    fn test_large_input_invalid() {
        let s = ")".repeat(1000000);
        assert_eq!(check_valid_parentheses(s.as_str()), false);
    }

    #[test]
    fn test_random_large_input_valid() {
        for _ in 0..5 {
            let s = generate_random_parentheses();
            assert_eq!(check_valid_parentheses(s.as_str()), true);
        }
    }
}

Multi-threaded programs are not necessarily faster than single-threaded ones. In many common cases, multi-threading is actually slower because two or more threads often need to share data by either passing messages among themselves (which takes time) or they need to wait for each other at various points to avoid read/write conflicts over the same data (which also takes time). If the overhead of multi-threading is more than the time saved by having multiple cores available, you won't get any performance inprovement.

Having said that, I think your code could be improved by splitting your string into non-overlapping slices and passing these directly into each worker thread (no need to use message channels, which do have a noticeable overhead). Something like this:

std::thread::scope(|s| {
    // split your 10 million character string into equal halves
    let (part_1, part_2) = input.split_at(5000000);

    // each thread gets half the workload
    let handler_1 = s.spawn(|| brute_checker(part_1));
    let handler_2 = s.spawn(|| brute_checker(part_2));

    handler_1.join().unwrap();
    handler_2.join().unwrap();
});

You can try splitting recursively to get more chunks and benchmark the results.

2 Likes

In the multithreaded version you are pretty much cloning all the chunks synchronously twice before sending them to the threads.This requires reading the whole input, which is not that much slower than running brute_checker. And this happens before the threads even start working on their chunks!

You can solve this by slicing the input string and send only borrowed slices to the worker threads. To do this you'll need std::thread::scope in order to pass borrowed data to the threads, and you'll also have to change how the chunks are computed (since there's no chunks for str). For example Rust Playground (note that the playground has limited parallelism, so you'll likely won't see much difference between the sequential and parallel approach there)

Though for this simple example I would avoid using channels: Rust Playground

4 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.