Failing to send data into a channel

Like @vitalyd, I would be a bit surprised if this were all there is to the issue, considering that the multiprocessing version should then face more or less the same load imbalance problems as the multithreading one.

But you can certainly check this hypothesis by measuring the time it takes to process each individual file, for example via a suitably instrumented version of process_file, then doing some statistics on the resulting timing measurements.


If that turns out to be an issue, then as I mentioned earlier, you could try parallelizing over pages in a file in addition to parallelizing over files. Don't worry about the nested parallel loops, Rayon can handle them efficiently and they are what you want to use if some files are much bigger than others.

Here is the simplest way to do this that I could think of. It is not the most efficient parallelization scheme possible (as collecting the text then processing it is unnecessarily memory intensive and sequential compared to a "streaming" approach where you send work to the threads as you go), but it is easiest to understand and should be enough to evaluate if this track is worth pursuing.

// Usual "I could not test this" disclaimer applies again here :)
fn process_file<'a>(filename: &'a PathBuf, rx: &'a Regex)
                    -> Option<Reply<'a>> {
    thread_local! {
        static DPL: RefCell<dpl::Dpl> = RefCell::new(
            dpl::Dpl::new().unwrap());
    }
    DPL.with(|dpl_cell| {
        let dpl = dpl_cell.borrow_mut();
        if let Ok(mut pdf) = dpl.da_open_file_readonly(
                &filename.to_string_lossy()) {
            // ASSUMES is_regex == true & that we want every matching page

            // Collect the full PDF text into a vec (this part needs to
            // be sequential as the "pdf" object is stateful)
            let num_pages = pdf.da_get_page_count();
            let mut pages_text = Vec::with_capacity(num_pages)
            for pnum in 1..num_pages + 1 {
                if pdf.da_set_page(pnum) {
                    // NOTE: May need a to_owned() there if the pdf text
                    //       extraction does not produce an owned String
                    pages_text.push(pdf.da_extract_page_text());
                }
            }

            // Carry out regex analysis on the extracted text, possibly in parallel
            // (Rayon should only split this workload if workers are starving)
            let pages = pages_text.par_iter()
                                  .enumerate()
                                  .filter(|(text, _)| rx.is_match(&text))
                                  .map(|(_, page_id)| (page_id + 1) as i32)
                                  .collect::<Vec<_>>;

            // Do the rest as usual
            if !pages.is_empty() {
                return Some(Reply{filename, pages});
            }
        }
        None
    })
}

EDIT: If, as I suspect, it turns out that the multiprocessing version is still faster, you might be able to speed it up further by spawning a pool of processes at the beginning and keeping it alive until the end, as the Python multiprocessing module does.

Here is a sketch of how this could work:

  • Instead of collecting all input from stdin at once, workers fetch stdin input line by line.
  • Whenever they are done processing a file, workers emit a corresponding result on stdout (even if the result says that there was no match), before fetching the next filename from stdin.
  • Using these stdout responses, the manager can keep an up-to-date record of how many files are queued for processing in each worker process and use that to...
    1. Block if all processes are busy with a sufficient backlog of files
    2. Always send work to the least loaded worker.

I don’t think I’m going to get any faster than the < 4 mins I’m now achieving with Rust because basically all that time’s consumed by the DLL. Keep in mind that Python’s multiprocessing module is v. efficient (& has been v. optimized on Windows as well as Unix).

I also rather like the rayon + process approach because I can safely ‘block’ each thread waiting for its subprocess to finish, knowing that any working thread will be able to continue.

I started learning Rust in December and I’m finding it a really seductive language. I find the compiler can be v. helpful; and sometimes v. confusing (try doing myvector.append(anitem) and see the confusing error message which doesn’t mention you need push() in this context). Also the crate documentation varies considerably (the csv crate is probably the best I’ve seen; HashMap is not so good – I couldn’t figure out how to use a faster hasher until I found the fnv crate.) And the O’Reilly book is fairly good (although I’m hoping the official book will be better). Best of all though has been this forum: I’d never have got so far without your help!

PS thanks for the .display() tip; that was exactly what I needed.

1 Like