I'm trying to achieve something that I've only been able to do with a custom executor. I want to be able to execute a set of futures and guarantee that only n
futures are executing at any given moment.
Here's a naive implementation of an executor attempting to do this
pub fn execute(&mut self) {
loop {
if let Ok(mut message) = self.rx.recv() {
if *message.fresh.read().unwrap() == true && self.active_task_count == self.max_active_task_count {
// check if we can execute this right now otherwise enqueue it
self.standby_task_queue.push_back(message);
continue;
}
let mut future_lock = message.future.lock().unwrap();
if let Some(mut future) = future_lock.take() {
if *message.fresh.read().unwrap() == true {
*message.fresh.write().unwrap() = false;
self.active_task_count += 1;
}
let waker = waker_ref(&message);
let ref mut cx = Context::from_waker(&*waker);
match future.as_mut().poll(cx) {
Poll::Ready(_) => {
// this future becomes the last reference to the task and gets dropped here.
// The task disappears forever.
// We pop the next available task from the task queue and begin work on it.
self.active_task_count -= 1;
if let Some(next_task) = self.standby_task_queue.pop_front() {
next_task.wake();
}
}
Poll::Pending => {
*future_lock = Some(future);
}
}
}
}
}
}
This does not guarantee that futures are executed in the order that they are called in. There are race conditions that can occur when the active count is decremented and an item being received. This is intentional and OK for the most part.
The core behavior here is that there are a maximum number of "alive" futures that are being polled. This is distinct from a thread pool where a certain number of threads are used to poll as many futures as possible. It is also distinct from spawning backpressure as I don't want senders to block
when i've hit the limit of futures. I want them to be enqueued and the exectuor will deal with them later. Right now the custom executor is using a linked list as a store for queued futures.
This is important when doing things like opening thousands of files and reading from them asynchronously without having thousands of file descriptors open at any given moment.
If i'm approaching this problem incorrectly, or there are other ways to tackle this particular problem, I would very much appreciate the feedback. Otherwise, I'm looking for a way to execute this pattern on the tokio runtime. Libraries like hyper
expect to be running on the tokio runtime and using a custom executor creates all sorts of problems.
Cheers,
Kevin