Rayon par_bridge deadlock

I have a queue of work that feeds back into itself, processing work can generate more work.

The problem is directory traversal. Step one is to put root directory into the queue. The work is to read that directory's dir_entry contents. Any content entry that is a directory is then put into the queue as more work.

The queued work is exposed through an iterator. I'm using rayon par_bridge to process this work in parallel.

I'm seeing occasional deadlock, I think I've narrowed the problem down to this loop in rayon:

In particular it calls iter.next() multiple times without necessarily giving the original op passed into rayon a chance to process any items. This causes problems because rayon can end up holding an item that will generate new work. If the work queue is empty at this point then things will just stall... because the work isn't done, but the item that would generate that new work is being held.

Here's a short example that shows the problem.

This problem is easiest to see/reproduce in a single threaded pool. In that case it's obvious that worker.push(it) will never get stolen by another thread so things will just stall every time. But I think the same basic problem can happen with the right workload and multiple threads in the pool.

use crossbeam::channel::{unbounded, Receiver, TryRecvError};
use rayon::{ThreadPoolBuilder};
use rayon::prelude::*;

struct WorkQueue {
    todo: usize,
    receiver: Receiver<usize>
}

impl Iterator for WorkQueue {
    type Item = usize;
    
    fn next(&mut self) -> Option<usize> {
        loop {
            if self.todo == 0 {
                return None
            }

            match self.receiver.try_recv() {
                Ok(each) => {
                    self.todo -= 1;
                    return Some(each)
                }
                Err(err) => match err {
                    TryRecvError::Empty => {},
                    TryRecvError::Disconnected => return None,
                },
            }
        }
    }
}
    
fn main() {
    let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
    let (sender, receiver) = unbounded();
    let work_queue = WorkQueue {
        todo: 2, // this queue should generate two items
        receiver,
    };

    let _ = sender.send(0); // queue starts with one item

    pool.install(|| {
        work_queue.par_bridge().for_each( |_| {
            let _ = sender.send(0); // processing that first item should generate the second item... but this is never called.
        });
    });

    println!("Hello, world!");
}

(Playground)

What next? Is this expected to work or should I have a different design somehow?

Thanks,
Jesse

Sorry, it's late so I haven't really followed your code but it sounds similar to this: Thread Pool Self-Induced Deadlocks - DZone Java

1 Like

I view it as your iterator as the problem. A queue on receiver is better matched to a while loop. A compromise might be to use an inner iterator that does not block.

while !work_queue.is_empty() {
    work_queue.iter().par_bridge()...
}

par_bridge is batching this way in order to amortize the synchronization overhead, so it's hard to tune this in a general way. I think pulling just one at a time would be prohibitive though. There's also a sort of priority inversion problem possible, like this Mutex bug, where work stealing can get you in a situation where a job is waiting on some result from a suspended task on the same stack, causing a deadlock.

Servo's layout algorithm has a similar sort of nested work, and they use a rayon::scope to then issue additional spawns recursively. Each of those tasks will actually execute independently without worrying about nesting on the stack and such.

1 Like

@cuviper @jonh Thanks for your help. I'll report back once I've had a chance to try.

Is there an interface that exposes semantics similar to Receiver's try_recv?

pub fn try_recv(&self) -> Result<T, TryRecvError>

If that existed or could be added then I think this rayon code:

for _ in 0..count {
    if let Some(it) = iter.next() {
        worker.push(it);
    } else {
        self.done.store(true, Ordering::SeqCst);
        break;
    }
}

Could be changed to:

for _ in 0..count {
    match iter.try_next() {
        Ok(it) => worker.push(it),
        Err(err) => {
            if let TryRecvError::Empty = err {
                break;
            } else {
                self.done.store(true, Ordering::SeqCst);
                break;
            }
        }
    }
}

And I think that would solve the deadlock that I see, while also allowing the normal case of caching (count * count) * 2; items when possible.

Of course maybe lots I'm not thinking about, but I've modified the code locally and it seems to be working well. I get same performance, avoid deadlock case, and I can keep using simple par_bridge rayon API instead of worrying about scopes and things that I don't understand very well.

There's nothing like try_next for general iterators, but you could certainly fork your own par_bridge using try_recv as you suggest. While that may work fine in your case, I don't think we'd want to add that to rayon for general use, because it would be really easy to induce deadlocks like I mentioned before.

You could also try traversing your subdirectories with recursive rayon calls, rather than trying to flatten them into a single iterator.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.