Properly distributing workload from VecDeque among threads

The scenario:

  • Have a queue - VecDeque - where each record is a file path
  • The queue itself is wrapped in an Arc and Mutex to allow concurrent pops
  • 4 threads access the queue and pop a path, read the file at the path and process it
  • This is continued until the queue is empty

Below is the main segment of queue handling. While the program compiles and executes without errors, it is not distributing work properly across all threads. 3 of the 4 threads process only 1 file each while the remaining thread processes all the other files. (There are more than 500 files.)
How do I fix it?

    // file_paths is a Vec<DirEntry>
    let shared_queue = Arc::new(Mutex::new(VecDeque::from(file_paths)));

    let threads: Vec<_> = (0..4).map(|_i| {
        let local_queue = shared_queue.clone();
        thread::spawn(move || {
            loop {
                let dir_entry = match local_queue.lock() {
                    Ok(mut queue) => queue.pop_front(),
                    Err(_) => panic!("Mutex poisoned"),
                };
                match dir_entry {
                    // process_file reads each file and does some 
                    // computations on the read data and 
                    // does not mutate the file
                    Some(entry) => process_file(entry),
                    None => break
                }
            };
        })
    }).collect();

    for t in threads {
        t.join();
    }

Note: While there are options like Rayon and ThreadPool. I am doing this as an exercise.

Not sure if this is the cleanest way, changing the match dir_entry {...} clause to the below segment gets it working with proper distribution.

I was hoping just drop(entry) (that should release the lock) would do the trick but the problem persists unless I use thread::sleep which is somewhat perplexing.

match dir_entry {
    Some(entry) => {
        let path = entry.path();
        drop(entry);
        thread::sleep(Duration::from_micros(1));
        process_file(path, i);
    },
    None => break
}

Lock types have different kinds of fairness guarantees. It's quite likely that the kind of lock you use there is unfair, it could even have some sort of bias toward re-acquisition.

While you could look for other kinds of locks in the ecosystem that have different characteristics, I would instead recommend looking at channels for work distribution and completion.

1 Like

The stdlib only has multi-producer and single-consumer channel. The pattern required here is single-producer and multi-consumer. There are crates that offer them but I am trying to stick with stdlib only (exercise).

Python, for example, has an implementation of mpmc queue in the stdlib.

Is your process_file really fast? If so, the threads might be contending on the lock. Maybe you want to have a look into some profiling or such.

Also, it's fine doing this as an exercise with std-only, but in real life I usually use crossbeam-channel. It doesn't need the locking and it is MPMC.

4 Likes

I echo the crossbeam sentiment, never felt much need to artificially restrict myself to std things.

How about this - you could add a dealer thread which workers can ask for work when they're free over a MPSC channel and get parcels sent back over a per-worker SPSC channel.

A bit chatty, but probably beats reimplementing crossbeam from scratch.

2 Likes

It, indeed, was the case of process_file being too fast. It also led to another problem of filling the channel faster than rx could consume leading to OOM situations. Fixed it by using sync_channel with bounds. Having said that, in production code of any sort, would prefer to use a crate like crossbeam.

Thanks, everyone!

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.