How to iterate over a Range in parallel with dynamic chunking


I have an algorithm that runs the same function on hundreds of millions of inputs (given as a Range), and I am trying to parallelise it efficiently. The runtime of the function is inhomogenous depending on the input, and cannot be predicted efficiently. So any static distribution of the workload will be inhomogenous as well.

I am currently using a parallel rayon iterator with static chunking, i.e. iterate over inputs in chunks of size 10_000. This works well, but towards the end of the algorithm, those chunks that take longer than others will keep a few processors busy longer than needed, while the others are already idle.
If I make the chunks smaller I see a lot of red bars in htop, i.e. overhead from locking between the threads.

Now I could try to search for the "optimal" chunk size, but I want my algorithm to run on different machines and inputs with different runtime characteristics, so this is not possible.

Instead, I would like an iterator (or any other abstraction) which allows me to pass a Range as an input and processes it in parallel with my function. It should pass chunks of the input Range to each thread, and dynamically optimize the chunk size to keep it small, but also keep the overhead from locking close to zero.

Is there a feature in rayon I have not found that does this? Or does anyone know of a different crate that does this?

If you split it up with small chunk sizes, rayon should distribute those on the few threads it has.

Thank you for the answer. I am not sure I fully understand what you mean.

For small dynamically distributed chunks, the locking overhead of the queue is too large. I am running with 60 threads, so there are quite many that compete for the same queue. As a result, threads spend too much time waiting to access the queue.

On the other hand, if I would distribute small chunks statically, I will get problems with the heavily skewed runtime distribution of the chunks. Some evaluations of the function are fast, some are slow, and there might be patterns depending on other factors. So I would need to randomise the order first, which at the moment I am trying to avoid.

Please let me know if I'm understanding this right.
The input is an intermix of different types.
A function is being run on each of these different types of input, in parallel.
Compute time of each of these different types varies.
So you want to assess the heavy compute types and prioritise thread access for them over the lightweight compute types.

I don't know about run-time decision making but what if the program records compute times for each of these types to a stats file. Then, you an ration out the threads based on a simple weighted average of the compute times for the different types.

Ex: Type 1-3.
Type 1 takes 5 sec, and you have 300 'records'/function calls of this type.
Type 2 takes 100 sec, and you have 200 'records'/function calls of this type.
Type 3 takes 0.1 sec, and you have 1000 'records'/function calls of this type.

Assuming 100 available threads, you now have to split 1500 functions across 100 threads.

t1 = 5*300 = 1,500
t2 = 100*200 = 20,000
t3 = 0.1*1000 = 100

So t2 needs the most threads.

t1 = (1500/21600) * 100 = 6.94
t2 = (20000/21600) * 100 = 92.59
t3 = (100/21600) * 100 = 0.46

Keeping these stats refreshed in the stats file(s) is an exercise of its own (which I'm curious about too, for the proper implementation).

Seeing the above, you can now at least say:
t1 gets 6 threads, t2 gets 92 threads, t3 gets 2 threads.

Obviously, my experience and knowledge is limited. So this seems a solution to me, from what I know.
Any dynamic balancing would mean a full-on workload balancer.
That's the stuff of separate crates, as you mentioned.

Forgot to ask... how is it exactly that a Range is being used to identify/split types?
Is it a continuous range, or just whatever numbers exist between, say 1 to 500?

Thank you very much for the answer. Unfortunately there is no way to subdivide the inputs into different "compute types", as their performance is dependent on other factors that differ between executions and are too complicated to analyse.

The solution I am looking for would treat the inputs, functions and outputs as a black box and would simply try to make each chunk of input run for say 5 seconds. If it runs shorter, it gives the next thread a larger chunk, and if it is longer, it gives the next thread a smaller chunk.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.