Limited concurrency for future execution (tokio)

This weekend I'm looking a bit more into rust async and I created this sample program.

My actual solution would take 4 futures (max_concurrent) and then after ALL 4 futures are finished it executes the next 4 (like chunking). This is ok for short running tasks or if all tasks need equal'ish time. But I would say in best case this is half async. :melting_face:

Without any limits all possible tasks are started directly. Depending on the client it can be problematic to start 200 simultaneous downloads.

So my problems are:

  1. How can I change the code, that the concurrency maximum is always used instead of working in batches. Do I need something like future-queue? Or is it necessary to use streams for this?

  2. Does it make sense to use tokio::spawn in this context? From my understanding the program uses only 1 (os) thread at the moment? If possible I would like to configure how many threads/cores are used. I played around with spawn but it executes the task instantly and only the result seems to be available as a future.

My experience with async stuff is like 5 days ...

thx & cheers :slight_smile:

Peter

here directly the code for reference:

use futures::future::join_all;
use futures::Future;
use rand::{thread_rng, Rng};
use std::mem;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let ids: Vec<u64> = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    let max_concurrent: usize = 4;
    let mut futures = Vec::new();
    for id in ids {
        let future = async move {
            let num: u64 = thread_rng().gen_range(10..100);
            println!("START id: {} with {}ms", &id, &num);
            sleep(Duration::from_millis(num)).await;
            println!("STOP id: {}", id);
        };
        futures.push(future);
    }
    println!(
        "{} futures and max {} parallel executions",
        futures.len(),
        max_concurrent
    );
    execute_futures(futures, max_concurrent).await;
}

async fn execute_futures(futures: Vec<impl Future>, max_concurrent: usize) {
    let mut running_futures = Vec::new();

    for future in futures {
        if running_futures.len() == max_concurrent {
            let _ = join_all(mem::take(&mut running_futures)).await;
        }
        running_futures.push(future);
    }

    // join the rest
    join_all(running_futures).await;
}
1 Like

You should give JoinSet from the Tokio crate a try.

Hi alice

I don't know exactly, to which of my two questions your reply relates.

To be honest I started the "journey" with joinset and it confused me a lot. I think it was because spawn & join_next() don't work like the other futures + join()/join_all().

I tried joinset with a simple example that worked fine (starting ALL tasks concurrently without limit). Later I started to try something like the code below. Obviously it makes no sense, despite that it is not finished (my counter is not counting etc.). I gave up when I realized that I misunderstood how tokio::spawn works. I thought it would spawn the task when calling join_next().await, but this is not the case. (don't know if the mutex is necessary, but it compiles .. ^^).

If I would have read the documentation more carefully it is pretty clear:

Spawn the provided task on the JoinSet, returning an AbortHandle that can be used to remotely cancel the task.

You do not have to .await the returned JoinHandle to make the provided future start execution. It will start running in the background immediately when spawn is called.

This "do not have to" sounded to me like "you don't have to, but you can" ... ^^.

Can you show me how I could control/limit the spawned tasks in an async fashion? I think I would need something like an "async iterator" (don't know if that exists) over "ids" wrapped in a control loop to limit the amount of spawned tasks.

use rand::{thread_rng, Rng};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tokio::time::{sleep, Duration};


#[tokio::main]
async fn main() {
    const ITEMS: u64 = 10;
    let ids: Vec<u64> = (1..=ITEMS).into_iter().collect();
    let max_concurrent: usize = 2;
    let futures = Arc::new(Mutex::new(JoinSet::new()));
    for id in ids {
        create_futures(Arc::clone(&futures), id).await;
    }
    println!(
        "{} futures and max {} parallel executions",
        ITEMS, max_concurrent
    );
    execute_futures(Arc::clone(&futures), max_concurrent).await;
}
async fn create_futures(futures: Arc<Mutex<JoinSet<()>>>, id: u64) {
    let mut locked_futures = futures.lock().await;
    locked_futures.spawn(async move {
        let num: u64 = thread_rng().gen_range(10..200);
        println!("START id: {} with {}ms", &id, &num);
        sleep(Duration::from_millis(num)).await;
        println!("STOP id: {}", id);
    });
}

async fn execute_futures(futures: Arc<Mutex<JoinSet<()>>>, max_concurrent: usize) {
    let counter: usize = 0;
    let mut locked_futures = futures.lock().await;
    while locked_futures.join_next().await.is_some() {
        loop {
            if counter < max_concurrent {
                break;
            }
        }
    }
}

How about something like this?

use tokio::time::{sleep, Duration};
use rand::{thread_rng, Rng};
use tokio::task::JoinSet;

#[tokio::main]
async fn main() {
    let max_concurrent = 2;
    let ids: Vec<u64> = (1..=10).into_iter().collect();
    let mut join_set = JoinSet::new();
    
    for id in ids {
        while join_set.len() >= max_concurrent {
            join_set.join_next().await.unwrap().unwrap();
        }
        join_set.spawn(my_bg_task(id));
    }
    
    println!("DONE SPAWNING");
    
    while let Some(output) = join_set.join_next().await {
        output.unwrap();
    }
    
    println!("ALL DONE");
}

async fn my_bg_task(id: u64) {
    let num: u64 = thread_rng().gen_range(10..200);
    println!("START id: {} with {}ms", id, num);
    sleep(Duration::from_millis(num)).await;
    println!("STOP id: {}", id);
}
5 Likes

Would this alternate phrasing have helped avoid that?

Thank you for the runnable example, looks great.

Short after my reply (during googling for "async iterator" ^^ ) I realized, that probably another while loop is the way to go.
Good timing and you saved me some time here :slight_smile: thank you very much!!

I will give it a try.

The provided future will start running in the background immediately
/// when spawn is called, even if you don't await the returned
/// JoinHandle.

Yes that sounds very good to me.
I hope my "approve" in github was ok/expected. I don't have much experience with it ... :see_no_evil: :sweat_smile:

1 Like

@alice
I was able to integrate the stuff in my app and I also created a simple example with indicatif based on your example here.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.