Failed when join thread after parallel traversal the binary tree

I am trying to implement parallel traversal in a binary tree, I using worker pool for parallel traversal, but i got a problem when joining the threads

thread '<unnamed>' panicked at library/std/src/sys/unix/thread.rs:265:13:
failed to join thread: Resource deadlock avoided (os error 11)

It seems like some thread join failed when the program finished ( when the drop trait function is called). To reproduce the problem there is my code, you can run cargo run to reproduce the problem.

1 Like

I don't know that this is the problem, but the first thing I'd do is get rid of the Arc<Mutex<Receiver>> and replace it with usage of crossbeam_channel, which provides a clonable Receiver. This way you're not blocking on receive and holding a lock.

2 Likes

Hi @kpreid , thanks for your reply, I would try to use crossbeam_channel crate! The worker pool I implement in this example is just following the example of rust book.
I want to ask if this error can be ignored or what might be the root cause of this problem, I can not find any rust example that shows the same error message ( most of example if cpp ). thanks ~

The error message might be obscure because it's operating system specific. I don't have any further suggestions.

2 Likes

My suspicion is that you're joining a thread from itself, as described in the documentation. Probably the last work pool Arc to expire is sometimes on one of your worker threads.

Your timing in main is misleading as the work pool can still be doing work when the initial call returns to main.

You may be able to address both by having some explicit wait-for-shutdown method instead of using Drop, but I didn't attempt such.


Probably unrelated, but parallel_visit also has some TOCTOU logic errors, where you lock something, do a check, drop the lock, and then assume your check is still valid when you lock things again later.

    if !worker_pool.lock().unwrap().is_full() {
        // ...time passes...
            // Returns without doing anything if full
            worker_pool.lock().unwrap().execute(move || {

I'm guessing the following one is inert due to how you're calling things, but could trigger in a different scenario. It's an unnecessary unlock-relock in any case.

    if root.lock().unwrap().left.is_some() {
        // ...time passes...
        parallel_visit(root.lock().unwrap().left.as_mut().unwrap(), worker_pool);
    }

You also don't need &mut on root so perhaps you were misunderstanding some implications.


Here's an alternative implementation (untested).

pub fn parallel_visit(root: &ThreadSafeNode, worker_pool: &ThreadSafeWorkPool) {
    let tsn_guard = root.lock().unwrap();
    let Node { left, right, .. } = &*tsn_guard;
    match (left.as_ref(), right.as_ref()) {
        (None, None) => {},
        (Some(single), None) | (None, Some(single)) => {
            parallel_visit(single, worker_pool)
        }
        (Some(left), Some(right)) => {
            let mut wp_guard = worker_pool.lock().unwrap();
            if wp_guard.is_full() {
                drop(wp_guard);
                parallel_visit(left, worker_pool);
            } else {
                let left = left.clone();
                let wp = worker_pool.clone();
                wp_guard.execute(move || parallel_visit(&left, &wp));
                drop(wp_guard);
            }
            parallel_visit(right, worker_pool);
        }
    }
}

pub fn recursive_visit(root: &ThreadSafeNode,) {
    let tsn_guard = root.lock().unwrap();
    let Node { left, right, .. } = &*tsn_guard;
    match (left.as_ref(), right.as_ref()) {
        (None, None) => {},
        (Some(single), None) | (None, Some(single)) => {
            recursive_visit(single)
        }
        (Some(left), Some(right)) => {
            recursive_visit(left);
            recursive_visit(right);
        }
    }
}

I took a closer look at your work pool after that, and I believe it also has some inherent race conditions with the current implementation, also involving your counter. I think it needs a redo.

3 Likes

Hi @quinedot , thanks for all your reply, I help me a lot, But I want to ask one more thing, You mentioned that there is some race condition in my work pool implement, Could you tell me a little more about this, I thought if I use lock there would not be any race condition ( obviously I am wrong, but I would like to know where i am wrong, thanks ~ )

1 Like

Let's say the counter is at 1 and you have multiple traversers (A, B) still going. This is a possible order of operation:

  • A checks counter in visit; it's 1. It calls execute
  • B checks counter in visit; it's 1. It calls execute
  • B checks counter in execute; it's 1. It shoots off the job
  • B's job is recieved and the counter is decremented
  • A checks counter in execute; it's 0. A's job gets silently dropped

If you do something like println!("P") in the parallel traverser and println!("R") in the recursive one, and then do

cargo run --release | sort | uniq -c

You'll probably see that the R counts are consistent whereas the P counts fall short.


Here's another (less buggy but not what you intended):

  • A checks counter in visit; it's 1. It calls execute
  • B checks counter in visit; it's 1. It calls execute
  • B checks counter in execute; it's 1. It shoots off the job
  • A checks counter in execute; it's 1. It shoots off the job
  • B's job is recieved and the counter is decremented
  • Nothing is available to receive A for awhile

If you want to ensure the counter doesn't change, you have to keep it locked between checking and decrementing (or otherwise rethink the design).

1 Like

@quinedot Thanks for your help, It really helped me a lot. I think the origin error I mentioned on the top of this page is caused by self-join because the main thread would close immediately and the worker pool would still work, and my job contains the reference count of the worker pool itself, which caused the worker pool to be drop in one of the worker thread. so i added a channel for waiting for all workers to finish job, and it worked without error.

use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread::{self, JoinHandle};

pub type Job = Box<dyn FnOnce() + Send + 'static>;
pub struct Woker {
    pub id: usize,
    pub thread: Option<JoinHandle<()>>,
}

impl Woker {
    pub fn new(
        id: usize, 
        job_receiver: Arc<Mutex<Receiver<Job>>>,
        finish_sender: Arc<Mutex<Sender<Result<(), ()>>>>,
        counter: Arc<Mutex<usize>>,
        size: usize,
    ) -> Self {
        Self {
            id,
            thread: Some(thread::spawn(move || {
                loop {
                    let result = job_receiver.lock().unwrap().recv();
                    match result {
                        Ok(job) => {
                            job();
                            let mut counter_garud = counter.lock().unwrap();
                            *counter_garud +=1;
                            if *counter_garud == size  {
                               finish_sender.lock().unwrap().send(Ok(())).unwrap();
                            }
                            drop(counter_garud);
                            
                        }
                        Err(_) => {
                            break;
                        }
                    }
                }
            }))
        }
    }
}

pub struct WokerPool {
    pub counter: Arc<Mutex<usize>>,
    workers: Vec<Woker>,
    job_sender: Option<Sender<Job>>,
}

pub type ThreadSafeWorkPool = Arc<WokerPool>;

impl WokerPool {
    pub fn new(size: usize, finish_sender: Sender<Result<(), ()>>) -> Self {
        let (job_sender, job_receiver) = channel::<Job>();
        let thread_safe_job_receiver = Arc::new(Mutex::new(job_receiver));
        let thread_safe_finish_sender = Arc::new(Mutex::new(finish_sender));
        let mut workers = Vec::with_capacity(size);
        let thread_saft_counter = Arc::new(Mutex::new(size));
        for i in 0..size {
            workers.push(Woker::new(
                i,
                Arc::clone(&thread_safe_job_receiver), 
                Arc::clone(&thread_safe_finish_sender), 
                Arc::clone(&thread_saft_counter),
                size,
             )
            )
        }
        Self {
            job_sender: Some(job_sender),
            workers,
            counter: thread_saft_counter,
        }
    }
    pub fn execute<F>(& self, f: F)
    where
        F: FnOnce() + Send + 'static
    {
        self.job_sender.as_ref().unwrap().send(Box::new(f)).unwrap();
    }
}
impl Drop for WokerPool {
    fn drop(&mut self) {
        drop(self.job_sender.take());
        for worker in &mut self.workers {
            // println!("Work id {} join", worker.id);
            if let Some(thread) = worker.thread.take() {
                 match thread.join() {
                    Ok(_) => {},
                    Err(_) => { println!("Error when join {}", worker.id) }
                 }
            }
        }
    }
}

But there is one more problem occur, my parallel visit is much slower than sequential, ( parallel is 2.4ms, recursive is 390us, benchmark by criterion )

pub type ThreadSafeWorkPool = Arc<WokerPool>;
pub fn parallel_visit(root: &ThreadSafeNode, worker_pool: &ThreadSafeWorkPool) {
    let tsn_guard = root.lock().unwrap();
    //println!("index : {}", tsn_guard.value);
    let Node { left, right, .. } = &*tsn_guard;
    match (left.as_ref(), right.as_ref()) {
        (None, None) => {},
        (Some(single), None) | (None, Some(single)) => {
            parallel_visit(single, worker_pool)
        }
        (Some(left), Some(right)) => {
            let mut counter_guard = worker_pool.counter.lock().unwrap();
            if *counter_guard == 0 {
                drop(counter_guard);
                parallel_visit(left, worker_pool);
                parallel_visit(right, worker_pool);
            }else {
                *counter_guard -= 1;
                drop(counter_guard);
                let wp_arc = Arc::clone(worker_pool);
                let left_arc = Arc::clone(left);
                worker_pool.execute(move || {
                    parallel_visit(&left_arc, &wp_arc);
                });
                parallel_visit(right, worker_pool)
            }
        }
    }
}

but when i merge part of parallel visit function use recursive function(sequential), is faster than just sequential ( parallel is 280us, recustrive is 403us ).

pub type ThreadSafeWorkPool = Arc<WokerPool>;

pub fn parallel_visit(root: &ThreadSafeNode, worker_pool: &ThreadSafeWorkPool) {
    let tsn_guard = root.lock().unwrap();
    //println!("index : {}", tsn_guard.value);
    let Node { left, right, .. } = &*tsn_guard;
    match (left.as_ref(), right.as_ref()) {
        (None, None) => {},
        (Some(single), None) | (None, Some(single)) => {
            parallel_visit(single, worker_pool)
        }
        (Some(left), Some(right)) => {
            let mut counter_guard = worker_pool.counter.lock().unwrap();
            if *counter_guard == 0 {
                drop(counter_guard);
                parallel_visit(left, worker_pool);
                parallel_visit(right, worker_pool);
            }else {
                *counter_guard -= 1;
                drop(counter_guard);
                // let wp_arc = Arc::clone(worker_pool);
                let left_arc = Arc::clone(left);
                worker_pool.execute(move || {
                    //parallel_visit(&left_arc, &wp_arc);
                    recursive_visit(&left_arc);
                });
                parallel_visit(right, worker_pool)
            }
        }
    }
}
pub fn recursive_visit(root: &ThreadSafeNode) {
    let tsn_guard = root.lock().unwrap();
    // println!("index: {}", tsn_guard.value);
    let Node { left, right, .. } = &*tsn_guard;
    match (left.as_ref(), right.as_ref()) {
        (None, None) => {},
        (Some(single), None) | (None, Some(single)) => {
            recursive_visit(single)
        }
        (Some(left), Some(right)) => {
            recursive_visit(left);
            recursive_visit(right);
        }
    }
}

I think that the overhead of the lock counter and send message cause is different. I would like to hear your advice ~ thanks ~


By the way, I add Instant to measure the time cost of each job in worker pool

  let now = Instant::now();
  job();
  println!("Cost of job: {} (Work id {:?})", now.elapsed().as_nanos(), id);

i found that parallel visit function which combine with recursive function's worker would take more time then purely parallel visit, maybe because may many time send job and lock and waiting counter cause purely parallel would slower.