Async queue with concurrent batch

I've been trying to figure out the best approach to handle async tasks in a queue with the ability to work on a maximum number of tasks concurrently.

I currently have a channel which when receives pushes a future in to a futures::stream::FuturesOrdered. I then call when fut.len() reaches the max, and tasks concurrently. This was heavilly based of this answer.

This works very well, however instead of waiting for the tasks to buffer/reach the max before calling, I instead want to continuously take and run available tasks whilst there is availability (up to the max limit).

I'm unsure the best approach for this, especially as it doesn't seem I can share FuturesOrdered between threads (I would have one thread to continuously fut.push(rx.event) and a separate to process the Maybe using a stream and take(max) would give me more flexibility?

You might be looking for a tokio::sync::Semaphore.

The second example in their docs shows how you can run a set number of tasks in parallel by acquiring the semaphore and spawning a task for each item. By moving the RAII guard (permit) into the spawned task, the semaphore will be automatically released when the task exits.

use std::sync::Arc;
use tokio::sync::Semaphore;

async fn main() {
    let semaphore = Arc::new(Semaphore::new(3));
    let mut join_handles = Vec::new();

    for _ in 0..5 {
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        join_handles.push(tokio::spawn(async move {
            // perform task...
            // explicitly own `permit` in the task

    for handle in join_handles {

You might be able to adapt that to your purposes.

1 Like

Thanks for this, I wasn't aware of semaphore and will look into it!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.