Help with simple thread pool based on the book

I had a go at implementing a thread-pool like program based on this example Turning Our Single-Threaded Server into a Multithreaded Server - The Rust Programming Language from the book.

use std::time::Duration;
use std::sync::{Arc, Mutex, mpsc};

fn main() {
    let stream = 0..10;

    let (sender, receiver) = mpsc::channel();

    let shared_receiver = Arc::new(Mutex::new(receiver));

    fn new_worker(reciver_mutex: Arc<Mutex<mpsc::Receiver<i32>>>, thread_num: i32) {
        std::thread::spawn(move || {
                println!("Thread {} started", thread_num);
                let local_mutex = reciver_mutex.lock().unwrap();
                println!("Thread {} aquired lock", thread_num);
                let n = local_mutex.recv().unwrap(); 
                println!("Thread {} recived {:?}", thread_num, n);
                std::thread::sleep(Duration::from_millis(2000));
                println!("Thread {} ended", thread_num);
        });
    }

    for thread_num in 1..4 {
        new_worker(Arc::clone(&shared_receiver), thread_num);
    }

    for n in stream {
        sender.send(n).unwrap();
        println!("Sent {}", n)
    }

    std::thread::sleep(Duration::from_secs(10));
}

However this outputs

Thread 1 started
Thread 1 aquired lock
Sent 0
Sent 1
Sent 2
Sent 3
Sent 4
Thread 1 recived 0
Thread 2 started
Thread 3 started
Thread 1 ended
Thread 2 aquired lock
Thread 2 recived 1
Thread 2 ended
Thread 3 aquired lock
Thread 3 recived 2
Thread 3 ended

All the threads are able to start in parallel, but they only seem to be able to aquire the mutex lock when the previous thread has ended, rather than when the mutex has been dropped at the end of the following line

let n = local_mutex.recv().unwrap(); 

so we end up with sequential behaviour instead of the 'work' (i.e. the sleeps) running concurrently.

What am I missing?

Drop the MutexGuard (local_mutex) once you no longer need it.

     println!("Thread {} recived {:?}", thread_num, n);
+    drop(local_mutex);
     std::thread::sleep(Duration::from_millis(2000));

Another example can be found in the docs.

1 Like

Thanks that worked!