What is the best way to allocate one task per cpu?

I've used rayon to do some small optimizations & this works great.

However, I now have a more challenging problem. I create n tasks where n is the number of CPUs, but I don't know what n will be in advance. (Well, it will always be >2 since I special case 1 or 2.)

But how do I get rayon (or thread or ...) to give each of these tasks to exactly one CPU per task & then gather the results when they're all done? Each task returns a Result<Vec<Item>> (I use the error-chain crate).

The reason I need to have one CPU per task is that each task accesses a DLL in C that will work within one thread but not across threads: so in each task I use the sharedlib crate to get pointers to the DLL's functions.

Thanks.

Reading your problem description, I understand why you want at most one CPU per task, but not why you want at least one CPU per task, which your use of "exactly" implies. Can you clarify this a bit?

https://crates.io/crates/num_cpus

It really depends on how the code in DLL works to say whether it is safe to use rayon or manually creating your own therads (or pool) or even if this will fail.

I create exactly as many tasks as there are CPUs (I use the num_cpus crate).

This is because each task must use sharedlib to access a DLL, and the DLL's functions must all be called within the same thread (otherwise the DLL will 'explode').

Hope this clarifies?

I think I am still misunderstanding a critical aspect of your problem statement. Can you clarify why the obvious solution would not work?

tasks.par_iter()
     .map(process_task)
     .collect()

In this code, unless you recursively spawn more Rayon task, process_task will run from start to finish in a single OS thread from the Rayon thread pool... so what can go wrong?

(PS: If you have some DLL initialization code that must be run no more than once per thread, or has a significant performance cost, you may want to use a thread_local inside of process_task for that)

I've done it as you suggested:

// This is somewhat simplified but is true to the structure
fn items(query: &Query) -> Result<Vec<dpl::Item>> {
    let mut tasks = create_tasks(&query);
    let mut items: Vec<dpl::Item> = vec![];
    let item_vecs: Vec<_> = tasks.par_iter().map(
        |task| process_task(task)).collect();
    for task_vec in item_vecs.iter() {
        for vec in task_vec.iter() {
            for item in vec.iter() {
		items.push(item);
	    }
	}
    }
    Ok(items)
}

The runtime of this is +- 0.1sec exactly the same as the non-concurrent version. Not surprising since both the concurrent & non-concurrent only max out one CPU. (Yet the smaller concurrency things I did with rayon all produced 100% speedups.)

Maybe the DLL serializes all work given to it by multiple threads. I've not heard back about this.

So tomorrow I'll try creating num_cores subprocesses and seeing if that's any better.

Note though that I think that rayon is superb. Really easy to use and for pure Rust it works brilliantly.

And so, the mysterious Dpl sequential bottleneck strikes again... :cry: Indeed, if it is like the last time, I suspect that you will need to go multiprocess again in order to work around it. How unpleasant.

Also, note that as currently written, your code may swallow errors:

  • IIUC, item_vecs is a Vec<Result<Vec<dpl::Item>>>
  • So for task_vec in item_vecs.iter() { iterates over the inner Results...
  • ...and for vec in task_vec.iter() { tries to unwrap the inner result, and does nothing if an error is returned (instead of reporting the error to the caller).

I am not sure how you want to handle this. There are multiple possible strategies depending on your requirements:

  1. Ignore the errors altogether (what the code currently does)
  2. Report only one of the encountered errors (what is most frequently done)
  3. Try to merge all reported errors into a single one (harder, but more satisfying)

Also, note that you can do some of the results-merging in parallel inside of Rayon. collect() itself is clever enough to do some simple merging like turning an iterator of Result<T> into a Result<Vec<T>> if requested to, but for more complex forms of merging you may need to write your own call to reduce() or fold().

I took out the error handling because it wouldn't compile & I just wanted to get it working. Certainly I don't know how to handle errors in rayon. There doesn't seem to be a rayon::Error that I can add as a 'foreign' error in error-chain and I can't use ? in par_iter() etc. Nor do the rayon docs seem to show how to cope with error values. So although tomorrow I'll try multiprocessing, I would really like advice or links to how to cope with using rayon with Vec<Result<Item>> results.

Oh and DPL is the 'Debenu (now Foxit) Quick PDF library'. It is a commercial PDF library. One day I hope to replace it with an LGPL library in my commercial apps but there isn't a suitable one I know of. Poppler's license precludes commercial use, Xpdf is far too expensive (and Adobe's even more expensive).

That is perfectly okay :slight_smile: I just wanted to point out the error-swallowing behaviour to you in case it wasn't done on purpose. I know that I spent a couple of minutes looking at that triple for loop and wondering where the third level of iteration came from myself, before remembering that Result is an iterable.

The reason why Rayon does not provide its own error type is that the library is intended to be quite "transparent" when it comes to errors. Almost all Rayon methods are considered infaillible, and panics from the inside of a parallelization construct are propagated to the outside. In general, the only errors that you need to deal with when using Rayon's constructs are the ones that originate from your code.

For these "inner" errors, as in many other areas, Rayon heavily mimicks the design of standard Rust iterators. Sadly, the design of the latter is also a little bit under-documented, and you often need to hunt a bit through the docs in order to find answers. Here is my personal cheat sheet:

  • The ? operator always returns from the innermost function. If that is a closure, it returns from the closure. So if you use ? in Iterator::map(), for example, it will inject the Results into your iterator chain instead of returning from the host as you may have expected. And this also means that you are not allowed to use ? in some Iterator contexts, such as within filter().
  • Iterator::flat_map() is not error-related, but a very convenient iterator method when you have something like an Iterator of Vec on your hands. It starts like map(), with the additional requirement that the function produces an iterable (that is, a type which implements IntoIterator), and in the end it produces a merged iterator combining the items of all intermediary iterators.
  • Iterator::collect() has quite a number of tricks up its sleeves. It can actually do a lot more than just accumulating individual items into collections. Anywhere a FromIterator implementation is defined, collect() can use that to collect individual iterator items into a single place. This includes...
    • ...collecting &strs into a String more efficiently than flat_map can.
    • ...collecting Option<T>s into an Option<Vec<T>>, which is None if at least one item was None, and the vector of item contents otherwise.
    • ...and, of interest here, collecting Result<T, E>s into a Result<Vec<T>, E> which reports the first observed error, if any, or the vector of successful results otherwise.
  • For cases when collect() still does not fit the bill, you can replace it with a hand-written reduction function. This is where the API of Rayon starts diverging a bit from that of standard iterators, for performance reason: standard accumulation is not parallelizable, you need to use slightly different algorithms in order to perform parallel reduction.

From your description, your tasks return a Result<Vec<dpl::Item>>, so you have an Iterator of Result<Vec<dpl::Item>> on your hands. As I hinted at before, the first thing that you need to define is, what do you want to have in the end?

  • Is it okay to only report a single error if multiple errors occured?
  • Do you want to exhaustively report every error that occured?

In the former case, you can often let Iterator::collect() do the work for you. In the latter case, you will need to design your error type in such a way that it can hold multiple error values at once, which can require deeper architectural changes. This is why unless being exhaustive is a hard requirement, people often go for the former approach: it is just a lot more convenient.

Let us assume that you go for this approach. For the usage scenario that we are discussing, I believe you will need to use a custom iterator reduction function, for the following reasons:

  • You cannot use collect() directly, as it would give you a Result<Vec<Vec<dpl::Item>>> instead of flattening the inner vectors.
  • You cannot use flat_map() to flatten the vectors before collecting, because it will just "flatten" the inner Results and give you an iterator of Vec<dpl::Item>, swallowing errors.

I would propose the following updated iterator chain, reduction included:

    tasks.par_iter()  // You may also want to investigate into_par_iter() here
         .map(process_task)  // No need for a closure here
         // And now, let's merge the Result<Vec<Item>>s!
         .reduce(|| Ok(Vec::new()),
                 |rv1, rv2| match (rv1, rv2) {
                     // If there are two Vecs, concatenate them
                     (Ok(mut v1), Ok(v2)) => {
                         v1.extend(v2);
                         Ok(v1)
                     },
    
                     // Otherwise, return the first error
                     (rv1 @ Err(_), _) => rv1,
                     (Ok(_), rv2 @ Err(_)) => rv2,
                 })  // Should return a single Result<Vec<Item>>

Thanks for clarifying! I went to have a look at their website, but could not find anything interesting pertaining to the peculiar multi-threaded behaviour that we are observing alas...

Thank you for a v. thorough explanation & the v. clever code for flattening the par_iter() results.

There is definitely some problem with DPL and threading. I've now switched to having two exes, a 'worker' that accepts JSON on stdin and sends JSON to stdout and a 'manager' which splits the work into tasks and uses rayon's par_iter() to hand off work. The function that handles each task creates a subprocess in which it runs the 'worker' exe. This runs 4x faster on a quad core machine and clearly maxes out all the cores.

For the errors, in my Reply struct I have an errors String which is empty if OK, otherwise has a message, so these are easy to handle. This has reduced my par_iter to just this:

    for mut reply in tasks.par_iter().filter_map(|task| get_reply(task))
            .collect::<Vec<Reply>>() {
        if !reply.error.is_empty() {
            bail!(reply.error);
        }
...

And it works great:-)

Thanks again for your help.