Unfair async semaphore

I am working on a distributed task engine, where each "worker" can receive tasks from multiple "jobs". I need worker to strive for "fairness" -- i.e. at any point in time number of running tasks that belong to a job A should be more or less the same as job B tasks. Worker has a limit on how many tasks can be running simultaneously.

Inside of worker there is a task (per job) that pulls tasks out of "channel" and executes them. But before doing this (if there is a job in the channel) it grabs a permit from the semaphore (which gets released once task is complete).

Tokio async semaphore can't help me with my "fairness" goal. I think I need an "unfair" semaphore which works smth like this:

  • to get a permit you need to provide a group id (job id, in my case)
  • semaphore keeps track of how many permits are currently held by each group
  • when permit is released -- it finds most "underrepresented" group (out of all waiters) and grants permit to a waiter that belongs to that group
  • also, it would be nice to have priority within the group, i.e. if it is time to grant permit to a waiter (of given group) -- grant it to one with highest "priority" attribute

It looks like one of those bicycles you don't want to remake -- I wonder, if there is a crate that already implements this logic? If not -- I'll be grateful for any advice on how to make one (I am pretty new to Rust).

Thank you.

1 Like

No experience with Tokio. But general experience with async processing in general.

Well... this has all the smells of a queue, right?

You can make an async method that picks "a" job, from a queue containing jobs, and then a task from a queue of tasks that the job has. Your async method would pick the job, pick the task and then execute the task for that job.

This way every job is dealt with the same "fairness".

And if you want to have different treatment based on different "customers", just make a "VIP queue" :slight_smile:

The rules you mention seems to be "business related"(ish), so casual rules may demand a casual solution.

Not going to work. Imagine case when worker was running tasks belonging to jobs A and B. Task limit is 90. So (assuming both jobs always have tasks in the queue) each job is represented by 45 tasks right now. New job C came along -- next 30 permits should go only to job C (assuming tasks for A and B finish at similar rate).

Reality is even more complicated -- I have tasks that released their permits and await for subtasks to complete. Once subtask result arrives -- they try to get the permit back (to continue). I.e. we have many places (each marked with job-id) try to grab a permit and semaphore is supposed to give away permits while trying to maintain a balance.

I really need an object with sempaphore-like interface -- i.e. something shared (Rc<MySemaphore>) between all participants with acquire_permit(&self, group_id) method(s).

I was about to suggest the same, something like a priority queue, where each entry is

(priority: u64, trigger: ...)

where for each task, you cause it to yield, waiting on a signal/send/something (which can be triggered by the 2nd element) ; then, you encode whatever rules you have into updating the (priroity: u64) part, then its a matter of:

loop {
pop element from priority queue, run trigger

Hmm... I see. Basically smth along these lines? :

  • have a priority queue of all groups (that hold at least one permit or waiter), reverse ordered by number of permits given group currently holds; permit stores it's group id and dropping it causes related group counter to decrease (potentially moving it in the queue or removing completely, of counter is zero)
  • granting permit to a group-id causes related group entry counter to increase (again potentially moving it in the queue)
  • inside each group there is another priority queue (ordered by in-group rank) of waiters -- just need to make sure all waiters with same rank are treated fairly (i.e. form FIFO between themselves (same with groups, btw))
  • releasing permit also means looking for a group (in priority order) that has at least one waiter, then selecting first waiter in group's queue and waking it up
  • waking up can be done by means of sync::oneshot::channel<Permit> -- waiter waits on receiver, waker sends permit

I am not sure if this will perform -- aren't I am supposed to do this via Future objects and polling mechanism (instead of async fn and channels)? I am much more comfortable with channels than rolling my own poll() logic, though...

Are you willing to pay the price of creating a channel PER async task you want in your custom priority system ?

If so, for each task, create a channel, has it block (trying to receive the msg).

Then, put create a priority queue (of whatever priority you want) of the form (priority, other_side_of_channel).

Then, when you want to activate a task, send a () to other_side_of_channel, thereby unblocking the async task.

I don't know -- that is why I am asking if this will perform... Is there a better way?

This is the simplest solution I know of. Good luck.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.