Failing to send data into a channel


#1

I’m using rust 1.24 on Win7-64.

I’m trying to process a bunch of files & to spread the processing over 4 threads. Here’s the relevant part of my code:

pub struct Query {
    pub case_sensitive: bool,
    pub is_regex: bool,
    pub pattern: String,
}

pub struct Reply {
    pub filename: String,
    pub pages: Vec<i32>,
}

pub fn process_files(query: Query, files: &Vec<PathBuf>) {
    let (sender, receiver) = mpsc::channel();
    for chunk in files.chunks(files.len() / 4) {
        let asender = mpsc::Sender::clone(&sender);
        thread::spawn(|| {
            for filename in chunk { // process_file returns Option<Reply>
                if let Some(reply) = process_file(&filename, query) {
                    asender.send(reply).unwrap();
                }
            }
        });
    }
    for reply in receiver {
        println!("{}: {:?}", reply.filename, reply.pages);
    }
}

Unfortunately, it won’t compile. Here’s the error I get:

   Compiling searchpdfscmd v0.1.1 (file:///R:/rs/searchpdfscmd_mpc)
error[E0277]: the trait bound `std::sync::mpsc::Sender<search::handler::Reply>: std::marker::Sync` is not satisfied
  --> src\search\handler.rs:24:9
   |
24 |         thread::spawn(|| {
   |         ^^^^^^^^^^^^^ `std::sync::mpsc::Sender<search::handler::Reply>` cannot be shared between threads safely
   |
   = help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender<search::handler::Reply>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `&std::sync::mpsc::Sender<search::handler::Reply>`
   = note: required because it appears within the type `[closure@src\search\handler.rs:24:23: 30:10 chunk:&&[std::path::PathBuf], query:search::handler::Query, asender:&std::sync::mpsc::Sender<search::handler::Reply>]`
   = note: required by `std::thread::spawn`

I don’t understand why Reply isn’t sendable.

PS I have a working version of this using rayon but on a 4 core machine it never uses more than 39% of the CPU (and the processing is all CPU-intensive). I also have a Python version (which uses multiprocessing) and is much faster because this one maxes out all the cores.


#2

use move ||.
This &std::sync::mpsc::Sender<search::handler::Reply> is the datatype it’s really complaining about.
You will also have to clone chunk. (or clone in the iterator.)

Not sure you could expect a speed up vs rayon. Maybe something else slowing it down.


#3

I’ve changed || to move || when creating the closure for spawn. And instead of cloning the vector, I move it into the function since I don’t need it afterwards. Now the only error I get is

    thread::spawn(move || {
    ^^^^^^^^^^^^^
: type must satisfy the static lifetime

So I still don’t know what I’m doing wrong.

I was hoping this would be faster than rayon because rayon clearly doesn’t make use of all the CPUs on Windows (even though it knows how many there are and even though I tell it to use a thread pool size of 4).

In the end I’ll probably do the equivalent of multiprocessing by splitting it into a ‘manager’ program and a ‘worker’ program and use json-rpc. But I’d like to understand this kind of threading anyway, as part of my learning rust.


#4

Can you share this version, along with some test data? I would like to have a closer look at it. There is obviously something fishy here, which should be better understood before going through the trouble of distributing work yourself (which is very likely to be unnecessary here).

My first guess would be that you are somehow blocking the Rayon threads (via either IO or synchronization), as that is the single most common scalability problem with thread pool based parallelization libraries. But I would need to see the full code (including process_file) to be sure.

Can you also give me an idea of the current performance difference between the two versions, by timing them on the same task on your machine? CPU utilization differences are not enough on their own, since for all I know all these extra CPU cycles might go into CPython interpreter overhead or into data serialization/deserialization and interprocess communication.

That sounds like a needlessly heavyweight solution for such a simple problem. You can probably do much better with threads. So let us figure out why your current thread-based solution does not work very well :slight_smile:


#5

The closure cannot borrow anything (beyond 'static references) from its environment. I don’t know what your current code looks like but if it’s essentially the same as what you pasted before (only with move there), then it’s the chunk value; it borrows from the slice (Vec here).

If this isn’t it, then paste the code and we’ll figure it out.


#6

Hi Hadrien,

I can’t share it because the DLL that does most of the CPU-bound work is a commercial one that I’m not allowed to share the license key of. Nor can I share the PDFs that are processed since they belong to customers.

I’m not using mutexes myself or any other explicit sync mechanism. Also I have this example in Go & that performs poorly too. I suspect that the problem is that Go & Rust are not utilizing all the 4 cores (maybe this is expensive on Windows?).

I can show the Rust code of course that uses rayon:

pub struct Query<'a> {
    pub case_sensitive: bool,
    pub is_regex: bool,
    pub pattern: &'a str,
}

pub struct Reply<'a> {
    pub filename: &'a Path,
    pub pages: Vec<i32>,
}

pub fn process_files(query: &Query, files: &Vec<PathBuf>) {
    let total: u64 = files.par_chunks(files.len() / 4)
        .map(|filenames| process_some_files(filenames, &query)).sum();
    println!("total {}", total);
}

fn process_some_files<'a>(filenames: &'a [PathBuf], query: &'a Query)
                          -> u64 {
    let mut count = 0;
    for filename in filenames {
        if let Some(reply) = process_file(&filename, &query) {
            println!("{}: {:?}", reply.filename.display(), reply.pages);
            count += 1;
        }
    }
    count
}

fn process_file<'a>(filename: &'a PathBuf,
                    query: &'a Query) -> Option<Reply<'a>> {
    let mut pages: Vec<i32> = Vec::new();
    if let Some(mut dpl) = dpl::Dpl::new() {
        if let Ok(mut pdf) = dpl.da_open_file_readonly(
                &filename.to_string_lossy()) {
            // ASSUMES is_regex == true & that we want every matching page
            let rx = Regex::new(query.pattern).unwrap();
            for pnum in 1..pdf.da_get_page_count() + 1 {
                if pdf.da_set_page(pnum) {
                    let text = pdf.da_extract_page_text();
                    if rx.is_match(&text) {
                        pages.push(pnum);
                    }
                }
            }
            if !pages.is_empty() {
                return Some(Reply{filename, pages});
            }
        } // Pdf is closed here due to being dropped out of scope
    } // Dpl is released here due to being dropped out of scope
    None
}

the dpl module is a wrapper around a C DLL and uses sharedlib. I have to create each Dpl object (wrapper for the DLL) in its own thread.

Of course this version just prints as it goes rather than accumulating results but I did it earlier than the one using the thread module that I’m trying to get working now. (I also tried using the cpupool crate but gave up due mostly to the docs & lack of egs.)

This takes 10-11 minutes to process 192 files of which most have no results at all. Rust mostly uses 1 core for this even using rayon which knows the machine has 4 cores.

Python with multiprocessing takes < 3 minutes & utilizes all 4 cores. I don’t expect Rust to be faster because 99% of the work is done in the DLL – but it shouldn’t be slower!

Go takes 11-12 minutes using goroutines (but mostly uses just one core.) I can get Go under 4 1/2 mins by having a manager + worker exes and taking a mulitiprocessing approach.

I’m going to try the multiprocessing approach with Rust too, unless I can get the threaded speed up (i.e., for the threads to utilize all the cores).

I don’t care that I’m using a hammer to break a nut since what I’m trying to establish is how to do heavy CPU-bound processing maxing out all cores. Once I have a framework for this I can then apply it to bigger programs.

Anyway, I’m really enjoying learning Rust!


#7

Hi vitalyd,

Here’s the code in question:

pub fn process_files(query: &Query, files: Vec<PathBuf>) {
    let (sender, receiver) = mpsc::channel();
    for chunk in files.chunks(files.len() / 4) {
        let asender = mpsc::Sender::clone(&sender);
        thread::spawn(move || {
            for filename in chunk { // process_file returns Option<Reply>
                if let Some(reply) = process_file(&filename, query) {
                    asender.send(reply).unwrap();
                }
            }
        });
    }
    for reply in receiver {
        println!("{}: {:?}", reply.filename, reply.pages);
    }
}

structs same as before. I tried doing let chunk = chunk.clone(); just before the spawn but it didn’t help.


#8

You also have query being captured - that won’t work. Can you clone it? Or put it into a Rc and then move clones of that into the closure.


#9

As for low cpu utilization, are you sure the C code isn’t using a long-held mutex? Python multiprocessing would avoid that because you have process-level isolation, and mutex contention wouldn’t be an issue.


#10

Hi vitalyd,
That helps a bit but still doesn’t work. Here’s the new code & new error:

pub fn process_files(query: &Query, files: Vec<PathBuf>) {
    let (sender, receiver) = mpsc::channel();
    for chunk in files.chunks(files.len() / 4) {
        let query = query.clone();
        let asender = mpsc::Sender::clone(&sender);
        let chunk = chunk.clone();
        thread::spawn(move || {
            for filename in chunk { // process_file returns Option<Reply>
                if let Some(reply) = process_file(&filename, &query) {
                    asender.send(reply).unwrap();
                }
            }
        });
    }
    for reply in receiver {
        println!("{}: {:?}", reply.filename, reply.pages);
    }
}

I put #[derive(Clone)] on the Query struct.
Here’s the new error:

R:\rs\searchpdfscmd_mpc>cargo run --release v:\pdfs
   Compiling searchpdfscmd v0.1.1 (file:///R:/rs/searchpdfscmd_mpc)
error[E0597]: `files` does not live long enough
  --> src\search\handler.rs:23:18
   |
23 |     for chunk in files.chunks(files.len() / 4) {
   |                  ^^^^^ borrowed value does not live long enoug
...
38 | }
   | - borrowed value only lives until here
   |
   = note: borrowed value must be valid for the static lifetime...

I have to finish for tonight, but will resume tomorrow – thanks for your help! (I also tried cloning files, but that still had the lifetime error).


#11

Hmm, I should’ve mentioned this in my previous reply: chunks.clone() won’t help, it just clones itself but the clone still has a reference to the original slice (Vec).

You’ll probably want to put the Vec into an Arc, pass clones of the Arc to the closure, and then have the closure iterate over a subset of the Vec (pass in an extra arg that specified the range for the closure to use). Or split the Vec into sub-Vecs upfront and then move those sub-Vecs to the corresponding worker. But the main idea, no matter how you implement it, is your closure cannot have references to anything not 'static.


#12

Thanks: I’ll try the Arc approach tomorrow.

I doubt the C code has a mutex (but I don’t have access to the source), but I’ll ask the people who sell it.

I’ve now tried using Arc::new() on files + using one regex + creating just one Dpl per thread. The code seems to have the same runtime performance as the rayon version, but doesn’t seem to terminate:

pub fn process_files(query: &Query, files: Vec<PathBuf>) {
    let (sender, receiver) = mpsc::channel();
    let rx = Regex::new(&query.pattern).unwrap();
    let files = Arc::new(files);
    let size = files.len() / 4;
    for i in 0..4 {
        let asender = mpsc::Sender::clone(&sender);
        let chunk = files.clone();
        let arx = rx.clone();
        thread::spawn(move || {
            let start = i * size;
            let end = if i == 3 { chunk.len() } else { start + size };
            let mut dpl = dpl::Dpl::new().unwrap();
            for filename in &chunk[start..end] {
                let filename = filename.to_string_lossy().to_string();
                if let Ok(mut pdf) = dpl.da_open_file_readonly(&filename) {
                    if let Some(reply) = process_file(filename, &mut pdf,
                                                      &arx) {
                        asender.send(reply).unwrap();
                    }
                }
            }
        });
    }
    for reply in receiver {
        println!("{}: {:?}", reply.filename, reply.pages);
    }
}

fn process_file(filename: String, pdf: &mut dpl::DaPdf, rx: &Regex)
                -> Option<Reply> {
    let mut pages: Vec<i32> = Vec::new();
    for pnum in 1..pdf.da_get_page_count() + 1 {
        if pdf.da_set_page(pnum) {
            let text = pdf.da_extract_page_text();
            if rx.is_match(&text) {
                pages.push(pnum);
            }
        }
    }
    if !pages.is_empty() {
        return Some(Reply{filename, pages});
    }
    None
}

For comparison the rayon code seems much easier to understand and is 32 LOC vs. 45 for manually threading.

However both seem to use < 25% of CPU most of the time.

Note though that I was wrong about the way they utilize CPUs: In fact they both use all 4 CPUs but underutilize them all rather than maxing out one which is what I mistakenly said they did.

This now makes me suspicious that vitalyd’s suggestion that maybe the DLL has a performance-inhibiting mutex may be correct: I’m still waiting to hear from the DLL’s makers about that.

Anyway, thanks to vitalyd and Hadrian I now have two working versions and have learnt a lot more about threading in rust.

My main takeaway is that rayon is the way to go for this. (I just wish there was an equivalent of Python’s multiprocessing module for Rust, although maybe that isn’t possible.)

Anyway, next I’ll try to do the same thing using a ‘manager’ and ‘worker’ exes & see what performance that produces.

Thank you.


#13

Thanks for the clarifications and for sharing as much code as you can! Let’s see what we can do with what we have here…

The first thing which strikes me as strange is this:

pub fn process_files(query: &Query, files: &Vec<PathBuf>) {
    let total: u64 = files.par_chunks(files.len() / 4)
        .map(|filenames| process_some_files(filenames, &query)).sum();
    println!("total {}", total);
}

fn process_some_files<'a>(filenames: &'a [PathBuf], query: &'a Query)
                          -> u64 {
    let mut count = 0;
    for filename in filenames {
        if let Some(reply) = process_file(&filename, &query) {
            println!("{}: {:?}", reply.filename.display(), reply.pages);
            count += 1;
        }
    }
    count
}

It is my understanding that this use of par_chunks more or less forces Rayon to cut the array of filenames in exactly 4 chunks, and hand over each chunk to a worker thread, which will be forced in turn to process it from start to finish without any other work available after that.

So if there is any load imbalance in the input dataset, it will propagate into the workload of worker threads. Which may particularly relevant in our case, since you also suggested that the input load can be quite imbalanced:

This takes 10-11 minutes to process 192 files of which most have no results at all.

Is there any reason why you do not let Rayon partition the data across worker threads however it wishes, by using the regular par_iter() method like this?

// DISCLAIMER: I did not check if this code compiles, it may require adjustments
pub fn process_files(query: &Query, files: &Vec<PathBuf>) {
    let total =
        files.par_iter()
             .filter_map(|filename| process_file(filename, query))
             .inspect(|reply| {
                  println!("{}: {:?}", reply.filename.display(), reply.pages);
              })
             .count();
    println!("total {}", total);
}

In this version of process_files, Rayon’s worker threads are allowed to freely steal work from each other whenever they run out of work. This should result in better load balancing performance, and avoid worker thread starvation, which may be another cause of low CPU utilization.


Another thing, which is not parallelization-related but may reduce program efficiency, is that you compile a Regex once per input file inside of process_file. Compiling a regex can be expensive, so you may want to do it once at the beginning of processing, and cache the resulting Regex object inside of your Query struct.

For similar reasons, I am wondering if there wouldn’t be a benefit in spawning one thread-local DPL per worker thread too, instead of spawning one per file. That can be another optimization worth investigating after we have the CPU utilization matter sorted out.


Yet another thing to consider is that if most of your files are empty and some files are highly compute-intensive, you may need to also parallelize across the pages of a PDF, rather than just across PDF files, in order to keep your worker threads busy. But I don’t think that your Python version did this, so this is unlikely to be the heart of the issue here.


Speaking of which, can you share a similar portion of the multiprocessing-based Python code? Just to check if there is anything which it does obviously better than the Rust version…


#14

Your process_files worked as-is, but I did change it to create the regex once & pass a ref to it:

pub fn process_files(query: &Query, files: &Vec<PathBuf>) {
    let rx = Regex::new(query.pattern).unwrap();
    let total = files.par_iter()
        .filter_map(|filename| process_file(filename, &rx))
        .inspect(|reply| {
            println!("{}: {:?}", reply.filename.display(), reply.pages);
        }).count();
    println!("total {}", total);
}

Early on in the run it peaked at 45% CPU usage; but for most of the time it was < 25% as before & didn’t run any faster than before.

You’re right that it would be better to create a Dpl object once per thread, but I don’t know how to do that when using par_iter(). This was one of the reasons I was trying to launch my own threads.

I don’t think the Python version will be much help since it uses the concurrent.futures module, but here is the main function:

def search(findInfo, responding, foundOne, finished, state):
    start = time.monotonic()
    futures = set()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for filename in _find_files(findInfo):
            if state.value == int(StateKind.IDLE):
                executor.shutdown()
                break
            futures.add(executor.submit(_search, filename, findInfo, state))
        else: # If we successfully added all the searches...
            tickOn = time.monotonic()
            while futures:
                if state.value == int(StateKind.IDLE):
                    executor.shutdown()
                    break
                future = futures.pop()
                if future.done():
                    err = future.exception()
                    if err is None:
                        foundInfo = future.result()
                        if foundInfo.pages is not None:
                            if foundInfo:
                                foundOne(foundInfo)
                    else:
                        logging.warning(err)
                else:
                    futures.add(future)
                tickOff = time.monotonic()
                if (tickOff - tickOn) > 0.5:
                    tickOn = tickOff
                    QThread.msleep(100)
                    responding()
    finished(secs=time.monotonic() - start)

The _search function reads the PDF like my rust process_file function does.


#15

Hmmm… So it looks like you do get parallelism in the beginning, but a long sequential tail at the end. Wondering what this is all about.

I wish I could have a closer look at what this program is doing with a time-resolved profiling tool, but sadly you are not allowed to share enough for me to do so… If you want to investigate on your side, though, WPA (Windows Performance Analyzer) and its predecessor xPerf were two nice Windows profiling tools last time I developed on that operating system.


As a cruder approach, you can make 4 copies of each PDF file and have your program analyze them, arranging the vector of filenames in such a fashion that you get the first copy of every file, then the second copy of every file, and so on. This should make each of the Rayon worker threads go through the entire input dataset (if you want to guarantee this, you can reuse the par_chunks() approach that you had in the beginning).

If this results in significantly increased CPU utilization, then load imbalance in the input dataset is likely to be the issue, and we can look into parallelizing the processing of individual files to compensate (I have some ideas for that based on rayon::scope()). If the CPU utilization pattern remains more or less the same, then I would look more in the direction of synchronization and IO bottlenecks.

The reason why I suggest that you make full-blown file copies, instead of just duplicating filenames, is that I’m wary of the underlying PDF processing library silently holding file-based locks. But you can try that approach as a first step if you like.


Concerning possible synchronization bottlenecks, note that there is one implicit lock in your current code, which is println!(). Standard output is synchronized in Rust, so if your program prints a lot of output, then you could be bottlenecked by that lock. To check if that is the issue here, you can try to temporarily remove the inspect() statement in process_files:

pub fn process_files(query: &Query, files: &Vec<PathBuf>) {
    let rx = Regex::new(query.pattern).unwrap();
    let total = files.par_iter()
        .filter_map(|filename| process_file(filename, &rx))
        .count();
    println!("total {}", total);
}

Well, the issue at hand here is that Rayon is built to hide the worker threads from you, but you actually want it to do something once per worker thread. As your Dpl object’s constructor takes no parameter, this sounds like one of the few legitimate uses for thread-local storage:

// Declare thread-local storage and specify how to initialize it
thread_local! {
    // Need a RefCell here because we want to mutate the Dpl and the Rust
    // compiler cannot prove the absence of shared mutable borrows at
    // compile time when "static" thread-local state is used.
    static DPL: RefCell<dpl::Dpl> = RefCell::new(dpl::Dpl::new().unwrap());
}

// ...

// Use the thread-local storage in your code (will lazily initialize it)
DPL.with(|dpl_cell| {
    // Acquire a mutable reference to the RefCell'd DPL object
    let dpl: &mut dpl::Dpl = dpl_cell.borrow_mut();

    // Now you can do your PDF processing as before!
});

Indeed, I do not see anything in the part of the Python code that you shared which looks obviously much more clever than what the Rust code does.

One interesting part is that the algorithm used for polling futures is likely to generate high CPU utilization in the master process, because it essentially spins for 0.5s, then falls asleep for 0.1s and starts over, which means that the master process is consuming CPU 5/6 of the time even when nothing is happening. But this is not enough to explain why the worker processes have high CPU utilization as well, let alone why they are going faster.

Can you share a bit more of the _search function? I need to know more precisely what the worker processes are doing in the Python version in order to compare it with the Rust version.


EDIT: Oh, and one last thing. To check if you have an IO issue on your hands, you may want to ask Rayon to oversubscribe the CPU a bit:

rayon::ThreadPoolBuilder::new().num_threads(8).build_global().unwrap();

If this improves the CPU utilization and running time, then the PDF library is likely blocking for IO. If the CPU utilization remains similar or running times worsen, then a synchronization or load imbalance bottleneck is more likely.


What is the best way to allocate one task per cpu?
#16

I tried using static dpls & got rid of the println! inside the threads. This produced an early peak of over 50%, but then soon diminished back to < 25%. So I’m wondering if it is as you or vitalyd suggested and a mutex or file lock in the DLL itself.

Anyway here’s the Rust that now starts better but is still slow:

pub fn process_files(query: &Query, files: &Vec<PathBuf>) {
    rayon::ThreadPoolBuilder::new().num_threads(8).build_global().unwrap();
    let rx = Regex::new(query.pattern).unwrap();
    let replies: Vec<Reply> = files.par_iter()
        .filter_map(|filename| process_file(filename, &rx)).collect();
    for reply in replies {
        println!("{}: {:?}", reply.filename.display(), reply.pages);
    }
}

fn process_file<'a>(filename: &'a PathBuf, rx: &'a Regex)
                    -> Option<Reply<'a>> {
    thread_local! {
        static DPL: RefCell<dpl::Dpl> = RefCell::new(
            dpl::Dpl::new().unwrap());
    }
    let mut pages: Vec<i32> = Vec::new();
    DPL.with(|dpl_cell| {
        let dpl: &mut dpl::Dpl = &mut dpl_cell.borrow_mut();
        if let Ok(mut pdf) = dpl.da_open_file_readonly(
                &filename.to_string_lossy()) {
            // ASSUMES is_regex == true & that we want every matching page
            for pnum in 1..pdf.da_get_page_count() + 1 {
                if pdf.da_set_page(pnum) {
                    let text = pdf.da_extract_page_text();
                    if rx.is_match(&text) {
                        pages.push(pnum);
                    }
                }
            }
            if !pages.is_empty() {
                return Some(Reply{filename, pages});
            }
        }
        None
    })
}

And here’s the Python _search function you asked to see:

def _search(filename, findInfo, state):
    if state.value == int(StateKind.IDLE):
        return _foundInfo(filename)
    pages = []
    try:
        with PDF.FileRO(filename) as pdf:
            if state.value == int(StateKind.IDLE):
                return _foundInfo(filename, pages)
            for pnum in range(1, len(pdf) + 1):
                if state.value == int(StateKind.IDLE):
                    return _foundInfo(filename, pages)
                text = pdf.text(pnum)
                if findInfo.regex.search(text) is not None:
                    pages.append(pnum)
                if findInfo.onlyfirst and pages:
                    return _foundInfo(filename, str(pages[0]))
    except PDF.Error as err:
        return _foundInfo(filename, str(err), False)
    return _foundInfo(filename, pages)

I’m going to try the manager + worker exes approach now. My idea being to use rayon to start up 4 threads each using std::process to execute a worker exe and gather its output.


#17

Thanks! Unfortunately, I did not find anything significantly different from the Rust version in the part of the Python version that you can show, so in my eyes that only leaves the following possibilities:

  • There is a load imbalance issue with the input file list, which Rayon does not manage to resolve on its own for some reason (I doubt it is this explanation, but did you try my test of making 4 copies of each file just to check?)
  • The Rust and Python wrappers to the PDF library somehow work differently under the hood.
  • The PDF library performs some harmful thread synchronization internally, which Python does not see thanks to multiprocessing

Good luck with the multiprocess version!


#18

FWIW, based on everything I’ve read in this thread (including the Go experiment, which I missed earlier), I’m pretty convinced the C lib has a highly contended mutex.

Can you trace syscalls?


#19

I think the problem is that some files take v. little time to process but two or three take absolutely ages.

I’ve rerun the Python version with the same version of the PDF library & it is now slower, coming in at < 5 mins. The Rust version using a multiprocessing approach takes < 4 mins. This is very pleasing since 99% of the work is done in the DLL; it means that even with starting subprocesses Rust has much lower overhead that Python here.

I originally tried starting 4 threads each to run a subprocess and giving 1/4 of the filenames to each subprocess. But this turned out to be too coarse-grained (i.e., if one subprocess got 2 huge files to work on, it killed performance).

Anyway, here is the worker in one main.rs file:

extern crate dpl;
extern crate regex;

use regex::Regex;
use std::io::{self, BufRead};

pub struct Reply {
    pub filename: String,
    pub pages: Vec<i32>,
}

fn main() {
    let (pattern, files) = read_files();
    let rx = Regex::new(&pattern).unwrap();
    let mut dpl = dpl::Dpl::new().unwrap();
    for filename in &files {
        match dpl.da_open_file_readonly(&filename) {
            Ok(mut pdf) =>
                if let Some(reply) = process_file(&mut pdf, &rx,
                                                  &filename) {
                    println!("{}\t{:?}", reply.filename, reply.pages);
                },
            Err(err) => println!("?ERROR\t{}", err),
        }
    }
}

fn read_files() -> (String, Vec<String>) {
    let mut pattern = String::new();
    let mut files = Vec::new();
    let stdin = io::stdin();
    for line in stdin.lock().lines() {
        let line = line.unwrap();
        if line.starts_with('?') {
            pattern = line.trim_left_matches('?').to_string();
        } else {
            files.push(line.to_string());
        }
    }
    (pattern, files)
}

fn process_file(pdf: &mut dpl::DaPdf, rx: &Regex, filename: &str)
                -> Option<Reply> {
    let mut pages: Vec<i32> = Vec::new();
    for pnum in 1..pdf.da_get_page_count() + 1 {
        if pdf.da_set_page(pnum) {
            let text = pdf.da_extract_page_text();
            if rx.is_match(&text) {
                pages.push(pnum);
            }
        }
    }
    if !pages.is_empty() {
        return Some(Reply{filename: filename.to_string(), pages});
    }
    None
}

It can accept multiple files because that’s how I started, but now I only give it one file to process each time.

And here’s the code that calls it in the manager:

pub struct Reply {
    pub filename: String,
    pub pages: String,
}

// 237 sec, 3'57" Rust vs. 4'58" Python
pub fn process_files(query: &Query, files: &Vec<PathBuf>) {
    const MAX_PROCESSES: usize = 8;
    rayon::ThreadPoolBuilder::new().num_threads(MAX_PROCESSES)
        .build_global().unwrap();
    for reply in files.par_iter()
            .filter_map(|filename| process_file(filename, query.pattern))
            .collect::<Vec<Reply>>() {
        println!("{}: {}", reply.filename, reply.pages);
    }
}

fn process_file<'a>(filename: &'a PathBuf, pattern: &'a str)
                    -> Option<Reply> {
    let mut child = Command::new(
        "r:/rs/searchpdfsworker/target/release/searchpdfsworker.exe")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .spawn()
        .expect("failed to start worker");
    {
        let stdin = child.stdin.as_mut().expect("failed to talk to worker");
        writeln!(stdin, "?{}", pattern).unwrap();
        writeln!(stdin, "{:?}", filename).unwrap();
    }
    let results = child.wait_with_output().expect("failed to hear worker");
    if !results.stdout.is_empty() {
        for line in String::from_utf8(results.stdout).unwrap().lines() {
            let parts: Vec<&str> = line.splitn(2, '\t').collect();
            return Some(Reply{filename: parts[0].to_string(),
                            pages: parts[1].to_string()});
        }
    }
    None
}

So in the end I think that the problem I hit was those 2 or 3 huge files killing performance. It is also clear that rayon is a really good crate.

One problem I have with the code is that somehow the filenames ended up with 's doubled and quoted, so v:\pdfs\boson1.pdf became "v:\\pdfs\\boson1.pdf" which isn’t valid for Windows. I put in a crude fix in dpl.rs but I’ll try to figure out where this came from.

Thank you both very much for all your help!


#20

If load imbalance is the issue, then multiprocessing (vs multithreading) shouldn’t help unless I’m missing something. Your total time will still be dominated by some poor worker(s) that get stuck with heavy files.

It’s because you’re printing Debug output to the child’s stdin, which will have escape characters. You can make this change:

// Change this
writeln!(stdin, "{:?}", filename).unwrap();
// to
writeln!(stdin, "{}", filename.display()).unwrap();