Futures execution prioritisation

I am trying to figure out the best approach to prioritisation of futures (requests being processed by a server). If I radically simplify the problem, I can say I have got a server (let's say actix web server with async request processing functions) which processes incoming requests with async/await on a set of threads (could be just one thread). Each request can be one of 2 types. For example I would like to separate as the following, but can change the categories in the future: 1) requests with incoming data requiring empty response and 2) requests with empty incoming data but requiring responses full of data. Type of a request can be recognised from very early on when it is dispatched in a requests processing function (which is async). In most cases requests processing involves either reading or writing files on disk.

When a host has got plenty of resources, I do not care about the prioritisation as all requests are processed with relatively low latency.

Now when a host starves for resources (or when my server is required by a side-monitor to slow down a bit), I would like to make sure that requests of the 1st type take priority for execution. Specifically, futures constructed for the processing of the 2nd type requests are not executed until 1st priority requests are all processed (or about to be completed processing). In other words, I would like to make sure the latency for 1st type requests is minimal as possible; the latency for the 2nd type of requests can be unbounded (waiting even indefinitely, if necessary).

I searched rust for futures priority polling, tasks prioritisation queues, but still struggle to find the best approach and the best combo of libraries, which could help me to achieve the required behaviour.

Could you please advise how you would do it and what libraries you would use within the async function, which processes the requests?

The current approach I am using is counting inflight (not yet completed futures) requests of the 1s type. When a counter drops to zero it wakes up processing for the requests of the 2nd type via oneshot. It feels like a hack... Anything better available?

If I understand it correct, there is no benefit to not processing type 1 requests once the corresponding connection has been accepted.
Type 2 requests could be held in a queue that is then fed to a threadpool, so you can limit how many are fed to the threadpool if needed.

Futures itself have no concept of priority.

How can I hold in a queue and how can I manage what is fed to a thread pool? Executor from a web server polls for all types of requests anyway...

This could be a reasonable feature of an async executor, but I don’t know of any that provide it. You’d specify the priority of a task when creating it, and it would always poll high-priority tasks first.

I’ve been thinking that this kind of setup could be useful for things like managing a graphics thread: you’ve got the frame rendering which needs to occur on a strict schedule, and also background tasks like loading data into graphics memory. If the executor knows which task is the soft-real-time one, it can keep the latency down while still performing background work in its free cycles.

2 Likes

This might be doable without any changes to the executor, via a custom queue of Futures.

If you manually implement Future on your queue/container, you control the order in which it internally polls its content.
As long as you put all futures related to these two priority levels in the same queue that knows how to poll them, you should be able to poll higher-priority ones first.

2 Likes

If the high-priority task becomes ready before you’re done processing the same round of low-priority futures, it should interrupt that processing to be polled. An executor can do that check by reading flags set by the different Wakers. A Future-based implementation can only do it by repeatedly polling the high-priority subfutures, which could be less efficient.

What you described is essentially the think I am looking at. And the problem of "soft-real-time" is what I have got.

Could you please elaborate a little bit more? I understand the idea, but I do not understand how to put it in code... Internals of futures are unknown to me... Maybe you could give some pseudo code sample how to wire a future to a queue?

if I have got async process_request(request: Request): Response { ... }, I understand how to craft a custom future with a poll function, which I could return from the process_request function, but I do not understand how this custom future becomes aware about all other futures... which are somewhere in a queue...

Does the executor poll futures one by one in a loop? If so, low priority futures can return not ready while where are high priority futures, but I think it will be a waste of CPU resources as the executor will continuously come to these low priority futures in the same loop... Correct?

This sort of approach could work, but there’s a little more to it. The executor won’t necessarily poll the Future again until its Waker has been notified; you’ll need store it and to handle the notification yourself when the low-pri futures become re-enabled.

could you please put some pseudo code and references to functions which are involved there? at the moment I can not understand this sentence even 10% :slight_smile:

Found this: https://rust-lang.github.io/async-book/02_execution/02_future.html I hope it answers my questions...

1 Like

Executors don’t poll futures in a tight loop. Instead, they poll each future once and then wait to be told there’s more work to do. The key part of this can be found in the Future::poll documentation:

When a future is not ready yet, poll returns Poll::Pending and stores a clone of the Waker copied from the current Context . This Waker is then woken once the future can make progress.

If you’re writing your own implementation of Future, you’ll need to ensure this holds. Usually, that just means passing the Context to your child Futures.

If you’re going to artificially return Pending without actually polling those futures, though, you’ll be responsible for calling Waker::wake yourself once your code is ready to poll them again. Otherwise, one of the wake signals could get lost, preventing a Future from completing.

1 Like

I understand this now. Thanks

One option to implement a rather naĂŻve prioritization would be to start multiple runtimes, with the low priority runtime given many less threads than the high priority runtime, then spawn tasks onto the appropriate runtime.

One issue you can run into with a prioritization system is known as priority inversion, commonly you'll need to have something like priority inheritance so that if a high priority task is blocked on a low priority task (e.g. waiting on an async mutex it holds, or waiting on a response on a channel) the low priority task gets boosted to high priority until that completes. To implement something like this you would need to be using a set of synchronization primitives that know about the priority system in use, so that e.g. the channel receiver when waited on can tell the system to boost the task that owns the sender (identifying this task is also going to be difficult/require a weird API where the task pre-registers interest in sending into the sender at some point in the future).

5 Likes

Thanks. I am aware about it. I have found I can intercept waker wake up calls. It means I can trace all futures which can be progressed and which are pending. So, it means I can put into "progression" all 1st priority futures. When all 1st priority futures are complete or are all waiting (mutex, network or disk), 2nd priority futures can be progressed. In this case it should not run into the issue you described.

1 Like