Stuck, need help with threading and channels

I am trying to implement my own ThreadPool for educational purpose (following the course from Matthew Stoodly). But cant seem to make it work. I cant understand why I am not getting response from all my threads.

use std::sync::{Arc, Mutex, mpsc::{Receiver, Sender, channel}};

pub struct ThreadPool{
    size: u32,
    sender: Option<Sender<Box<Fn() + Send>>>,
    ch_done: Receiver<()>

}

impl ThreadPool{
    pub fn new(size: u32)->Self{
        let (sender, receiver) = channel();
        let (ch_send, ch_recv) = channel();
        let a = Arc::new(Mutex::new(receiver));

        for i in 0..size{
            let a2 = a.clone();
            let s2 = ch_send.clone();
            std::thread::spawn(move || loop{
                let m_guard = a2.lock().unwrap();
                let f : Box<Fn()+Send> = match m_guard.recv(){
                    Ok(f) => f,
                    Err(_) => {
                        s2.send(()).ok();
                        return;
                    }
                };
                drop(m_guard);
                f();
                // println!("Sending done: {}", i);
                s2.send(()).ok();
            });
        }
        ThreadPool{size, sender: Some(sender), ch_done: ch_recv}
    }

    pub fn run<F: Fn() + Send + 'static>(&self, f:F){
        if let Some(ref ch) = self.sender{
            ch.send(Box::new(f)).expect("Failed in run()");
        }
    }

    pub fn wait(mut self, size: u32){
        // println!("Waiting for {} threads to finish", size);
        self.sender.take();
        for _ in 0..size{
            self.ch_done.recv().expect("Panicking in wait()");
            // println!("Closed a thread");
        }
    }
}

pub fn main(){
    let tp = ThreadPool::new(10);
    for i in 0..20{
        tp.run(move || {
            std::thread::sleep(std::time::Duration::from_millis(200));
            println!("runing in thread = {}", i);
        });
    }

    // std::thread::sleep(std::time::Duration::from_millis(3000));
    tp.wait(20);
}


It is random. Sometimes I am getting the print output from 18 threads, sometimes 19 but not 20. What is it that I am doing wrong?

You stop the receiver self.sender.take();
Any threads checking on it gives message back. Err(_) => { s2.send(()).ok();
Program ends before actual tasks finish run.

1 Like

Well you only spawn 20 threads and they start counting from 0.

fn main() {
    for i in 0..20 {
        dbg!(i);
    }
}

(try executing it, you won't see 20 here either)

If you want to spawn 21 threads you have to change the range to 0..=20.

Thank you John for your reply. I see.. strangely in Matthew stoodlye's video course it works fine. Is there any way to circumvent this issue?

Thank you Luro02. I can see total of 20 threads. It is just that I am not getting response back from 20 threads.

I see, I misinterpreted your sentence :sweat_smile:

1 Like

Not seen the video so can't comment.
Depends on what you want to happen. Lazy change is just up the argument to 30. A question is why you want an argument to wait in first place? It adds user error. You need to ask if you want to track that threads finish or just the tasks and let the OS deal with threads once main ends.

Thank you Jonh. Your earlier reply forced me to think in a different perspective. Pretty sure that my current code is not best design but it works now! Please do comment if you feel I can improve in any area within the code.

use std::sync::{Arc, Mutex};
use std::collections::VecDeque;

pub struct Tasker{
    queue: VecDeque<Box<dyn Fn() + Send>>,
    num_threads: u8,
    pool: ThreadPool
}

impl Tasker{
    pub fn new(size: u8)->Self{
        Tasker{
            queue: VecDeque::new(),
            num_threads: size,
            pool: ThreadPool::new(size),
        }
    }

    pub fn add<F: Fn() + Send + 'static>(&mut self, f:F){
        self.queue.push_front(Box::new(f));
    }

    pub fn execute(self){
        self.pool.execute(self.queue);
    }
}


pub struct ThreadPool{
    size: u8,
    tasks: Arc<Mutex<VecDeque<Box<dyn Fn() + Send>>>>,
    handles: Vec<std::thread::JoinHandle<()>>
}

impl ThreadPool{
    fn new(size: u8)->Self{
        let tasks = Arc::new(Mutex::new(VecDeque::new()));
        let started = Arc::new(Mutex::new(false));
        let mut handles = Vec::new();

        for _ in 0..size{
            let task_queue = tasks.clone();
            let start_tracker = started.clone();

            handles.push(std::thread::spawn(move || loop{
                let mut m_guard = task_queue.lock().unwrap();
                let mut start_guard = start_tracker.lock().unwrap();
               //if queue is empty and no thread has started, then simply continue running
               //if queue is empty and a thread has started, it means that the entire queue is processed and it is time to stop
                if (*m_guard).is_empty() && *start_guard{
                    return;
                }
               //if queue contains tasks, each available thread will pull one task out of the queue and process it
                if !(*m_guard).is_empty(){
                    let task: Box<dyn Fn() + Send> = (*m_guard).pop_back().unwrap();
                    *start_guard = true;
                    drop(m_guard);
                    drop(start_guard);
                    task();
                }
            }));
        }

        ThreadPool{
            size,
            tasks,
            handles
        }
    }

    fn execute(self, tasks: VecDeque<Box<dyn Fn() + Send>>){
        let mut task_queue = self.tasks.lock().unwrap();
        *task_queue = tasks;
        drop(task_queue);
        for handle in self.handles{
            handle.join();
        }
    }
}

pub fn main(){
    let mut tasker = Tasker::new(10);
    for i in 0..30{
        tasker.add(move ||{
            println!("Running {}", i);
        });
    }
    tasker.execute();
    // std::thread::sleep(std::time::Duration::from_millis(1000));

}

Maybe I dont need the Tasker struct. I think I can squeeze everything within the ThreadPool.

Once again, thank you for your feedback.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.