Async queue with concurrent batch

I've been trying to figure out the best approach to handle async tasks in a queue with the ability to work on a maximum number of tasks concurrently.

I currently have a channel which when receives pushes a future in to a futures::stream::FuturesOrdered. I then call fut.next() when fut.len() reaches the max, and tasks concurrently. This was heavilly based of this answer.

This works very well, however instead of waiting for the tasks to buffer/reach the max before calling fut.next(), I instead want to continuously take and run available tasks whilst there is availability (up to the max limit).

I'm unsure the best approach for this, especially as it doesn't seem I can share FuturesOrdered between threads (I would have one thread to continuously fut.push(rx.event) and a separate to process the fut.next()). Maybe using a stream and take(max) would give me more flexibility?

You might be looking for a tokio::sync::Semaphore.

The second example in their docs shows how you can run a set number of tasks in parallel by acquiring the semaphore and spawning a task for each item. By moving the RAII guard (permit) into the spawned task, the semaphore will be automatically released when the task exits.

use std::sync::Arc;
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(3));
    let mut join_handles = Vec::new();

    for _ in 0..5 {
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        join_handles.push(tokio::spawn(async move {
            // perform task...
            // explicitly own `permit` in the task
            drop(permit);
        }));
    }

    for handle in join_handles {
        handle.await.unwrap();
    }
}

You might be able to adapt that to your purposes.

1 Like

Thanks for this, I wasn't aware of semaphore and will look into it!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.