How to prevent these threads from starving?

I'am writing a ThreadPool using a VecDeque and a Condvar for waiting. However, I don't get any speed increase when increasing the number of threads. Printing the IDs of the threads which are processing the data reveals that mostly one thread is working and the others seem not to be able to get to work.

This is a minimal example which shows the same behaviour (at least on my machine).

Changing the sleep duration seems to have an impact in this example. However, this of course only simulates other work.

Can someone please explain
1. if I'am doing something fundamentally wrong or
2. how to better distribute the work across the threads.

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::VecDeque;
use std::time::Duration;

fn main() {
    let queue = Arc::new((Mutex::new(VecDeque::new()), Condvar::new()));

    let threads = (0..4).map(|i| {
        let queue2 = queue.clone();
        thread::spawn(move || {
            loop {
                let (ref queue, ref condvar) = *queue2;
                let mut queue = queue.lock().unwrap();
                while queue.len() == 0 {
                    queue = condvar.wait(queue).unwrap();
                }
                
                if let Some(value) = queue.pop_front() {
                    thread::sleep(Duration::from_millis(40));
                    println!("[{}] processing {}", i, value);
                }
            }
        })
    }).collect::<Vec<_>>();

    for i in 0..100 {
        let (ref queue, ref condvar) = *queue;
        let mut queue = queue.lock().unwrap();
        queue.push_back(i);
        
        condvar.notify_all();
    }

    for thread in threads {
        thread.join().expect("Couldn't join.");
    }
}

This might help:

if let Some(value) = queue.pop_front() {
    drop(queue); //    <--- add this line to release the RAII lock
    thread::sleep(Duration::from_millis(40));
    println!("[{}] processing {}", i, value);
}

There might be other problems hiding in there; I'm not sure as I seldom mess directly with parallel primitives.

Depending on what you are doing (in particular, if you do not need to insert new things into the queue), you might want to check out rayon which has an extremely low barrier to entry. Get your bang for the buck, I say.

2 Likes

@ExpHP is right in that you're holding the mutex across the processing call. You can write the code as follows:

let value = loop {
       let mut queue = queue.lock().unwrap();
       match queue.pop_front() {
             Some(value) => break value,
             None => queue = condvar.wait(queue).unwrap(),
      }
};
thread::sleep(Duration::from_millis(40));
println!("[{}] processing {}", i, value);

I agree with the suggestion of looking into rayon though.

2 Likes

Obviously. Thanks! I didn't see that one.

I took a look at rayon and it looks really suitable. However, I like to dedicate some time to understanding the basics.

Using channels instead of the queue was another option.

1 Like