Failing to send data into a channel

Hmmm... So it looks like you do get parallelism in the beginning, but a long sequential tail at the end. Wondering what this is all about.

I wish I could have a closer look at what this program is doing with a time-resolved profiling tool, but sadly you are not allowed to share enough for me to do so... If you want to investigate on your side, though, WPA (Windows Performance Analyzer) and its predecessor xPerf were two nice Windows profiling tools last time I developed on that operating system.


As a cruder approach, you can make 4 copies of each PDF file and have your program analyze them, arranging the vector of filenames in such a fashion that you get the first copy of every file, then the second copy of every file, and so on. This should make each of the Rayon worker threads go through the entire input dataset (if you want to guarantee this, you can reuse the par_chunks() approach that you had in the beginning).

If this results in significantly increased CPU utilization, then load imbalance in the input dataset is likely to be the issue, and we can look into parallelizing the processing of individual files to compensate (I have some ideas for that based on rayon::scope()). If the CPU utilization pattern remains more or less the same, then I would look more in the direction of synchronization and IO bottlenecks.

The reason why I suggest that you make full-blown file copies, instead of just duplicating filenames, is that I'm wary of the underlying PDF processing library silently holding file-based locks. But you can try that approach as a first step if you like.


Concerning possible synchronization bottlenecks, note that there is one implicit lock in your current code, which is println!(). Standard output is synchronized in Rust, so if your program prints a lot of output, then you could be bottlenecked by that lock. To check if that is the issue here, you can try to temporarily remove the inspect() statement in process_files:

pub fn process_files(query: &Query, files: &Vec<PathBuf>) {
    let rx = Regex::new(query.pattern).unwrap();
    let total = files.par_iter()
        .filter_map(|filename| process_file(filename, &rx))
        .count();
    println!("total {}", total);
}

Well, the issue at hand here is that Rayon is built to hide the worker threads from you, but you actually want it to do something once per worker thread. As your Dpl object's constructor takes no parameter, this sounds like one of the few legitimate uses for thread-local storage:

// Declare thread-local storage and specify how to initialize it
thread_local! {
    // Need a RefCell here because we want to mutate the Dpl and the Rust
    // compiler cannot prove the absence of shared mutable borrows at
    // compile time when "static" thread-local state is used.
    static DPL: RefCell<dpl::Dpl> = RefCell::new(dpl::Dpl::new().unwrap());
}

// ...

// Use the thread-local storage in your code (will lazily initialize it)
DPL.with(|dpl_cell| {
    // Acquire a mutable reference to the RefCell'd DPL object
    let dpl: &mut dpl::Dpl = dpl_cell.borrow_mut();

    // Now you can do your PDF processing as before!
});

Indeed, I do not see anything in the part of the Python code that you shared which looks obviously much more clever than what the Rust code does.

One interesting part is that the algorithm used for polling futures is likely to generate high CPU utilization in the master process, because it essentially spins for 0.5s, then falls asleep for 0.1s and starts over, which means that the master process is consuming CPU 5/6 of the time even when nothing is happening. But this is not enough to explain why the worker processes have high CPU utilization as well, let alone why they are going faster.

Can you share a bit more of the _search function? I need to know more precisely what the worker processes are doing in the Python version in order to compare it with the Rust version.


EDIT: Oh, and one last thing. To check if you have an IO issue on your hands, you may want to ask Rayon to oversubscribe the CPU a bit:

rayon::ThreadPoolBuilder::new().num_threads(8).build_global().unwrap();

If this improves the CPU utilization and running time, then the PDF library is likely blocking for IO. If the CPU utilization remains similar or running times worsen, then a synchronization or load imbalance bottleneck is more likely.

1 Like