Trying to understand what I need to implement for multi-threading to work in my context

Hello all. First time working with multi-threading, so the set-up is directly from Building a Multi-Thread WebServer, in the book. However, repurposing it a little to try new things. I noticed that the threads (in my scenario) don't seem to run or work on the full word_list I am providing, which is a Fuzzing word list containing numbers 000 through 999.

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()>FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}

type Job = Box<dyn FnBox + Send + 'static>;
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender,
        }
    }
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move|| {
            loop{
                let job = receiver.lock().unwrap().recv().unwrap();
                job.call_box();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

fn read_lines<P>(filename: P) -> io::Result<io::Lines<io::BufReader<File>>>
where P: AsRef<Path> {
    let file = File::open(filename)
        .expect("Could not open word list.");
    Ok(io::BufReader::new(file)
        .lines())
}

pub fn run(properties: Properties) {
    // let mut headers = HeaderMap::new();
    // if properties._headers.len() >= 1 && properties._headers[0] != String::from("NONE") {
    //     headers = set_headers(properties._headers);
    // };
    compound(&properties._word_list[0]);

}

fn compound(word_list: &String) {
    let pool = ThreadPool::new(5);
    if let Ok(lines) = read_lines(word_list.as_str()) {
        let mut counter = 0;
        for line in lines {
            counter += 1;
            if let Ok(word) = line {
                pool.execute(move|| {
                    println!("{:?}{:?}", word, counter);
                });
            }
        }
    }
}

Currently the threads are just printing out the word they are working on and an incrementing counter. But when I run the code it only ever shows that maybe 100 items were printed, and usually its 222 or before.

My thinking could be wrong here, because I have never worked with multi-threading before. Do the workers in the pool not remain active for incoming "jobs" in this scenario? I understand that the order will be all jarbled which is fine, but how can I get a set amount of workers to perform all the tasks?

Thanks in advance!

It looks like you're never calling join on the JoinHandle stored in Worker, so you aren't waiting for processing to complete— The program will terminate as soon as the main thread exits after enqueuing all of the jobs, and the prints you're seeing are those that manage to complete before that happens.

2 Likes

You need to explicitly join your threads. Currently at the end of compound you drop the ThreadPool before the other threads have acted on all the words; presumably then your main function returns and the other threads are shut down.

 pub struct ThreadPool {
     workers: Vec<Worker>,
-    sender: mpsc::Sender<Job>,
+    sender: Option<mpsc::Sender<Job>>,
 }

+// ...
+// Adjust constructor and job sender accordingly
impl Drop for ThreadPool {
    fn drop(&mut self) {
        // Have to get rid of the `Sender` before the `Receiver`s
        // or the `join`s will block forever
        self.sender.take();
        for worker in std::mem::take(&mut self.workers) {
            worker.thread.join().unwrap();
        }
    }
}
         let thread = thread::spawn(move|| {
             loop{
-                let job = receiver.lock().unwrap().recv().unwrap();
-                job.call_box();
+                // Don't panic when you can no longer receive
+                match receiver.lock().unwrap().recv() {
+                    Ok(job) => (job)(), // n.b. see below
+                    Err(_) => break,
+                }
             }
         });

I don't see a point to FnBox, so I also suggest...

-trait FnBox {
-    fn call_box(self: Box<Self>);
-}
-
-impl<F: FnOnce()>FnBox for F {
-    fn call_box(self: Box<F>) {
-        (*self)()
-    }
-}
-type Job = Box<dyn FnBox + Send + 'static>;
+type Job = Box<dyn FnOnce() + Send + 'static>;

And also

-fn compound(word_list: &String) {
+fn compound(word_list: &Path) {
     let pool = ThreadPool::new(5);
-    if let Ok(lines) = read_lines(word_list.as_str()) {
-        let mut counter = 0;
-        for line in lines {
-            counter += 1;
+    if let Ok(lines) = read_lines(word_list) {
+        for (counter, line) in lines.enumerate() {
             if let Ok(word) = line {
                 pool.execute(move || {
-                    println!("{:?}{:?}", word, counter);
+                    println!("{:?}{:?}", word, counter + 1);
3 Likes

Will give this a try when I get off work. Thank you! Will follow-up.

Thank you again for your solution and providing the information to go with it!

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.