Thread pool for prioritized tasks


#1

What crate/combination of constructs should I use to execute tasks in parallel starting them in specific order of their priority? All tasks are not known in advance - new ones keep coming as previous ones are done.

I’m scanning directories on disk and want to process the directories and their content in a specific order (I use BinaryHeap to sort them by priority). There may be 100,000 directories queued for processing, so I’d prefer to minimize memory usage and only pop tasks from the binary heap when they’re about to be executed.

If I do a loop like

while let Some(item) = queue.pop() { run(item) }

I imagine two problems:

  1. if run is non-blocking then whole prioritization won’t work.
  2. the loop may run out of tasks while there are some operations still running asynchronously and add new tasks to the queue later.

So I’m looking for something that will help me dequeue tasks bit by bit and wait until the queue is empty and all running tasks are finished. What would that be in Rust?


#2

This sounds like you want a semaphore to limit parallelism? Or a bounded channel that can put back pressure on the producer maybe?


#3

Probably. I’m thinking about something like that (plus appropriate locking):

loop {
 while let Some(item) = queue.pop() { 
   send_via_sync_mpsc(|| threadpool.run(|| process(item))) 
 }
 if threadpool.is_empty() { break; } 
  // breaks if run after mpsc, before theadpool call
}

but these types of algorithms tend to have horribly subtle race conditions and logic errors, so I hoped someone else has battled with this already :slight_smile:


#4

Ok so perhaps a countdown latch is more appropriate here. You’d need to know how many items are in the queue, set the countdown to that number, and then pass it to the tasks executed asynchronously on the pool. The current thread will wait on that latch and the workers will countdown as they complete. That would handle your requirement to wait for the batch to complete before moving on.

The latch can be implemented with a Mutex<u32> and an associated Condvar.

Alternatively, if you use the futures cpupool then you can join all the futures to be notified when all submitted work is done.