Batch execution of futures in the tokio runtime (or max number of active futures at a time)

I'm trying to achieve something that I've only been able to do with a custom executor. I want to be able to execute a set of futures and guarantee that only n futures are executing at any given moment.

Here's a naive implementation of an executor attempting to do this

    pub fn execute(&mut self) {
        loop {
            if let Ok(mut message) = self.rx.recv() {
                if *message.fresh.read().unwrap() == true && self.active_task_count == self.max_active_task_count {
                    // check if we can execute this right now otherwise enqueue it
                    self.standby_task_queue.push_back(message);
                    continue;
                }


                let mut future_lock = message.future.lock().unwrap();
                if let Some(mut future) = future_lock.take() {
                    if *message.fresh.read().unwrap() == true {
                        *message.fresh.write().unwrap() = false;
                        self.active_task_count += 1;
                    }
                    
                    let waker = waker_ref(&message);
                    let ref mut cx = Context::from_waker(&*waker);

                    match future.as_mut().poll(cx) {
                        Poll::Ready(_) => {
                            // this future becomes the last reference to the task and gets dropped here.
                            // The task disappears forever.

                            // We pop the next available task from the task queue and begin work on it.
                            self.active_task_count -= 1;
                            if let Some(next_task) = self.standby_task_queue.pop_front() {
                                next_task.wake();
                            }
                        }
                        Poll::Pending => {
                            *future_lock = Some(future);
                        }
                    }
                }
            }
        }
    }

This does not guarantee that futures are executed in the order that they are called in. There are race conditions that can occur when the active count is decremented and an item being received. This is intentional and OK for the most part.

The core behavior here is that there are a maximum number of "alive" futures that are being polled. This is distinct from a thread pool where a certain number of threads are used to poll as many futures as possible. It is also distinct from spawning backpressure as I don't want senders to block when i've hit the limit of futures. I want them to be enqueued and the exectuor will deal with them later. Right now the custom executor is using a linked list as a store for queued futures.

This is important when doing things like opening thousands of files and reading from them asynchronously without having thousands of file descriptors open at any given moment.

If i'm approaching this problem incorrectly, or there are other ways to tackle this particular problem, I would very much appreciate the feedback. Otherwise, I'm looking for a way to execute this pattern on the tokio runtime. Libraries like hyper expect to be running on the tokio runtime and using a custom executor creates all sorts of problems.

Cheers,
Kevin

It sounds like you are looking for buffer_unordered.

use futures::future::StreamExt;

let urls = vec![...];

let stream = tokio::stream::iter(urls)
    .map(|url| fetch_url_with_reqwest(url))
    .buffer_unordered(10);

while let Some(response) = stream.next().await {
    // handle response
}
async fn fetch_url_with_reqwest(url: UrlType) -> ResponseType {
    ...
}

This would fetch all the urls in the vector concurrently, at most 10 at the time.

This might totally be it. Thanks for pointing me to this.

It is also worth being aware of FuturesUnordered, which is how buffer_unordered is internally implemented. It is more powerful than buffer_unordered, but a bit more cumbersome to use.

use futures::future::StreamExt;

let urls = vec![...];
let mut futs = FuturesUnordered::new();

for url in urls {
    futs.push(fetch_url_with_reqwest(url));
    
    // If full, wait for one to finish.
    if futs.len() >= 10 {
        let response = futs.next().await.unwrap();
        // handle response
    }
}

// Wait for the remaining to finish.
while let Some(response) = futs.next().await {
    // handle response
}
async fn fetch_url_with_reqwest(url: UrlType) -> ResponseType {
    ...
}

The example above is 100% equivalent to the one using buffer_unordered.

If you need to insert multiple different futures, you might have the compiler start complaining. In that case you can use Either or FutureExt::boxed on the futures to make them into a single type.

This is incredibly helpful. It didn't occur to me to check the extension traits of stream. I saw chunks at some point but that would force some awkward join_alls in different places.

It seems like the approach with ready_to_run_queue is similar to the way that the executor I use works. It didn't occur to me to use tasks themselves as a linked list. It's still very unclear to me where the actual binding of waker -> future occurs and how it guarantees that each future isn't polled unnecessarily (i.e. determining which future in the queue did the waking, dequeuing it and polling it); but if it works, it works.

It does seem to work as intended. Thanks again for pointing me to this.

The short story of how it polls the right future is the following:

  1. Before FuturesUnordered polls a future, it creates a Waker specific to that future.
  2. When polling the future, it passes the waker to the future.
  3. Due to the contract on the future trait, that future is now supposed to emit a wake-up on the future it was given when it wants to be polled again.
  4. Once such a wake-up is emitted, the special Waker stores some info in the FuturesUnordered, and forwards the wake-up to whatever outer Waker the FuturesUnordered itself was polled with.
1 Like

I've finally grasped it in it's entirety i think. Thanks for the help. Futures are really tricky to think about when they're not abstracted away from you by some runtime.

The waker that's being created is the Task itself (via ArcWake). The task reschedules itself onto the ready_to_run_queue. The task holds a reference to the waker of the FuturesUnordered stream. When it is woken, it subsequently wakes the futures unordered as well.

The thing that i was missing is that the waker for the future is behind a weak pointer on the ready_to_run_queue as well and is updated on each poll.

The use of AtomicWaker also helps me understand how all of this is coordinated as a whole. If multiple tasks are queued with the same waker, they could potentially call the same waker multiple times (which you can't do). AtomicWaker, however gives you a wake(&self) that is idempotent.

Reading all of this really helped understand some underlying future bits.

2 Likes

@kvnvelasco know this is your thread, but, since it's fresh in your mind, any chance you'd mind explaining how the executor you use works? Have been trying to understand how FuturesUnordered manages tasks. Full unresolved thread here, but basically trying to understand how the linked list works with ready_to_run_queue. Only asking since much of the complexity is in ready_to_run_queue which you seem to understand from above. Thanks @alice for the link

@fooxed I'm incredibly sorry I didn't see this.

I was on holiday for 2-ish weeks after this and It slipped my mind to check.
Have you sorted this out?

We can probably go over it in the discord.

Cheers,
Kevin