Potential improvement to make async function run faster

Hi all. I am working on some async tasks using tokio. Just hopping to get some suggestions about my async function to make it potentially faster.

Thanks in advance!

use futures::stream::{FuturesUnordered, StreamExt};
use serde_json::{json, Value};
use tokio::sync::Semaphore;

pub async fn can_this_be_faster(todos: Vec<String>) -> Result<Value, Error> {
    // eg. Hyper client
    let client = std::sync::Arc::new(Client);

    let mut task_keys = vec![];
    let mut fetch_task_key_futs = FuturesUnordered::new();
    
    // limit the number of concurrencies due to hard limit of file descriptors
    // eg. Aws lambda
    let sem = std::sync::Arc::new(Semaphore::new(100));

    for todo in todos {
        let client = client.clone();
        let sem = sem.clone();

        let permit = sem.acquire_owned().await.unwrap();

        fetch_task_key_futs.push(tokio::spawn(async move {
            let res = fetch_task_keys(todo, client).await;
            drop(permit);
            res
        }));
    }
    
    while let Some(result) = fetch_task_key_futs.next().await {
        let res = result.unwrap().unwrap();
        task_keys.extend(res);
    }
    
    let mut ret = vec![];
    let mut task_futs = FuturesUnordered::new();
    
    for key in task_keys{
        let sem = sem.clone();
        let client = client.clone();
        let permit = sem.acquire_owned().await.unwrap();

        task_futs.push(tokio::spawn(
            async move {
                let ret = do_the_task(task, client).await;
                drop(permit);
                ret
            },
        ));
    }
    
    while let Some(result) = task_futs.next().await {
        let res = result.unwrap().unwrap();
        ret.push(res);
    }

    Ok(json!(ret))
}

async fn fetch_task_keys(
    todo: String,
    client: std::sync::Arc<Client>
) -> Result<Vec<String>, Error> {
    let mut ret = vec![];

    // This usually take a bit long to fetch ~0.3 seconds
    // long list
    client.fetch(todo).await?
}

async fn do_the_task(
    task_key:String,
    client: std::sync::Arc<Client>,
) -> Result<Value, Error> {

    // this usually very fast, ~0.01 seconds
    let task = client.fetch_task(task_key).await?;
    // do the processing, usually very fast
    let res = process(task);
    Ok(json!({"result":res}))
}

Have you tried not using tokio::spawn?

It would help if you had a minimum working example on a playground for example.

1 Like

When you are using tokio::spawn, there is no reason to use a FuturesUnordered. Just put them in a vector and replace the while let loops with a for loop over the vector. This will be a bit faster because FuturesUnordered allocates for every item, but a Vec makes only a few (or even one if you tell it the length up front).

1 Like

Hi@ambiso. Thanks for you comments. Is there any other suggestions that you have in mind if I don't use tokio::spawn? I can think of mpsc + thread, but not sure how much better it would be.

Hi @alice. Thanks for you suggestions. I have tried to use the Vec::with_capacity instead of FuturesUnordered. But it seems there are no significant performance improvement. The results are very similar. Maybe because using vec we have more spurious wakeups (not sure :stuck_out_tongue: ) so we did not get the expected performance boost?

I was thinking to use mpsc channel + multiple work thread to process the futures. It will be more complicated comparing to current code base. But not sure how much better it would be since I think tokio is pretty optimized already.

It looks like do_the_task is an async function but does not feature any await points, is that intentional?

What is the performance problem with your code? Have you benchmarked it?

If fetching a task key takes 0.3 seconds, I would expect the code to be network bound, rather than by any of the code you have presented.

1 Like

Hi @geeklint. Thanks for you reply. Very good catch. The actually code does have an await point. In this particular example, I forgot that. But it would have an .await after fetch_task( I will update that). I don't think I can do anything on the network bounded response. (eg. the time it takes to execute the fetch_task_keys and do_the_task is pretty much limited by the network). Since these tasks are network bounded, so that's why I came out with using tokio to do these tasks asynchronous to speed this up. Because I need to run this in a controlled environment, I will need to limit the number of concurrent tasks with semaphore to avoid resource ran out.

Maybe my assumption is not correct. My assumption was, if each fetch_task_keys takes 0.3s and I have 10 of them. In synchronous world, I would spend 3 seconds to finish them. But in asynchronous world, I am expecting them (10 tasks) to be completed in 0.3+some small overhead, say 0.5s. May be my estimate of 0.5 seconds is too short? That's why I was asking if there's any inefficiency in my code.

I have added a minimal compiled simulating example in playgroud Rust Playground. This take few seconds to execute. I was hopping to minimize the time it took to excute.

Maybe tokio was already doing a great work. Just trying to understand where does the extra time spend given this particular case

I mean, using a vector instead should only have a very small difference. It would be virtually nothing compared to 0.3 seconds.

1 Like

I've tweaked your playground slightly, mostly I added some printlns to track how long sections were taking, and it was taking 0.7-0.8s to complete in my experience (note, the compile time took 3-5s usually).

On a hunch, I threw the acquire_owned calls inside the async blocks, instead of before. This brought the runtime down to 0.5s.

Playground

1 Like

The only other thing I could think of would be that you're waiting for all fetch_task_key operations to finish before starting any do_the_task. (This spins your semaphore down to zero utilization). If you had your fetch_task_key operations spawn the do_the_task operations directly, you could keep your semaphore fully utilized throughout the process.

This probably slightly changes the behavior/ordering, although since you're using FuturesUnordered, it suggests to me that's not a concern.

1 Like

Hi @geeklint . Thanks for you help and suggestions :grinning: I have tried a similar approach previously, I was using the fetch_task_key to spawn the do_the_task directly, and do the collect results in fetch_task_key.
Below are the pseudo code of the previous structure

async fn main(){
    let mut ret = vec![];
    let mut futs = FuturesUnordered::new();
    for todo in todos{
        futs.push(tokio::spawn(async move{fetch_task_key(...)}));
    }

    while let Some(result) = futs.next().await{
        ret.extend(result);
    }
    ret
}

async fn fetch_task_keys( ... ) -> Result<Vec<JSON>, Error> {
    let mut ret = vec![];
    let keys = client.fetch(todo).await?;
    let futs = FuturesUnordered::new();
    for key in keys{
        fut.push(tokio::spawn(async move{do_the_task(key ... )}))
    }
    while let Some(result) = futs.next().await{
        ret.push(result);
    }
    Ok(ret)
}

async fn do_the_task( ... ) -> Result<Value, Error> {
    // this usually very fast, ~0.01 seconds
    let task = client.fetch_task(task_key).await?;
    let res = process(task);
    Ok(json!({"result":res}))
}

This approach was actually a little bit slower (by a small amount less than 1s) than the version in Rust Playground (Same link I've posted above). I am guessing the slowness could be due to the large returned futures by fetch_task_keys function?

What I noticed, which is quite interesting, was reducing the Semaphore can sometimes make the function run faster ... (not in the playground...) The actually function was running in an aws lambda function. Maybe due to the limit hardware resource. It cannot get all its potentials.

I'm not sure if it's been mentioned yet, but you can also use buffer_unordered. Playground.

You can also avoid the

        .collect()
        .await;

for task_keys but then it will not enforce the limit of 100 concurrent tasks properly. Playground.

1 Like

Hi @ambiso Thanks for your suggestions. I have tested the solution and I feel the speed difference is pretty much very similar to the version using semaphor. But the code does looks more clean. Thanks :slight_smile:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.