Async func not running concurrently

I have 2 async functions which I'm trying to run concurrently using the join! handle.
A snippet of the code -
let imgRatingFlowTimeStart = Instant::now();
let poll_result = polling_downloading(foodImgsFolder.clone(), &awsRegion, &sqsQueueName, &s3BucketB2);
let inference_result = model_inference(foodImgsFolder.to_string(),redisConnStr.to_string(),&graphSeshVecFC,&graphSeshVecFS,currentImgCount);
join!(poll_result,inference_result );
info!( "Image rating flow completed execution in: {:?}", imgRatingFlowTimeStart.elapsed() );

The execution time of SQS poll is ~ 100ms. The SQS polls for images, and if present , they are downloaded to a local directory. The model inference is run on the images present in that local directory.
I was trying to run those 2 functions concurrently such that while the model inference is running on certain images, the SQS is polled and images are downloaded.
I'm printing some logs within these functions. For some reason the inference runs to completion first and then the polling . The polling waits for the inference to complete rather than run concurrently. The output is as follows -
Model inference & result write complete in: 480.206431ms
SQS Poll time : 562.313015ms || Image Count: 10
S3 Image Download time : 104.715685ms
Image rating flow completed execution in: 667.115811ms

. Why is this happening and is there any other way I can make them run concurrently/parallely?

1 Like

What happens inside model_inference? Looks like this function is not really asynchronous.

I'm calling a couple of other async functions within the model_inference by awaiting them.
Could that be the problem?

Rust’s async system will let one task do as much computation as it wants before trying to run another one— If none of the futures that your model inference code uses are ever blocked, it’s probably not letting anything else work. You may need to add some yield_now calls to allow the downloads to get started.

(tokio docs) (async_std docs)

You may find the section on CPU bound tasks and blocking code interesting. The short answer is that if your joined tasks have no .await, then it is not able switch between them.

This is pretty much never what you want. If your code is blocking, don't run it on Tokio.

1 Like

According to this - join! - Asynchronous Programming in Rust

Using await won't help me much. And using join! would probably run it concurrently. But I'm pretty new at this.

I mean, awaits inside the things you are joining.

I have now refactored my code in a way that only the polling_downloading func have a few awaits that's because of the SQS and S3 dependency.
My model_inference isn't calling any another async functions. So there is no await being used.

My code still runs in a sequential manner and not concurrently. So should I add awaits inside the thing I'm joining?

So it should not be async. For computations, i.e. for CPU actively doing work, there's multithreaded concurrency, not async concurrency.

I understand now.

But while going through this document - Why Async? - Asynchronous Programming in Rust it suggested that async programming could overcome some limitations that arise when using threads.

Also, if model_inference is not async, then I have one async(polling_downloading) and one regular func (model_inference). Can I still use threads to make them run concurrently?

Yes.

If your "model_inference" is not calling other async functions and has no ".awaits" in it then it should not be a asyn function itself.

Of course that then means you have no async handle to use with "join!"

The solution to that is to turn it into an async thread by wrapping it in "tokio::task::spawn_blocking".

            let handle1 = tokio::spawn(async move {
                // Some async code here that uses ".await"
                // ...
            });
            let handle2 = tokio::task::spawn_blocking(move || {
                // Some long running synchronous code here
                // inc. calls to non-asyc functions that don't use ".await"
                ...
            });

            tokio::select! {
                _ = handle1 => {
                    error!("Async task completed.");
                }
                _ = handle2 => {
                    error!("Sync thread completed.");
                }
            }

Okay understood. Thank you so much. I will try this approach.

That document also says a very important thing at the end:

It's important to remember that traditional threaded applications can be quite effective, and that Rust's small memory footprint and predictability mean that you can get far without ever using async . The increased complexity of the asynchronous programming model isn't always worth it, and it's important to consider whether your application would be better served by using a simpler threaded model.

Async is not always a good solution to running many tasks. Particularly when they are long running compute intensive jobs that you want to be distributed over many cores for increased compute performance. That is better done with regular sync threads.

Async is great when you have thousands/millions of things going on that are not compute intensive, in fact spending a lot of their time waiting for external things to happen. The canonical example being web servers that may be maintaining millions of HTTP connections, which require waiting on database responses an so on.

The great advantage of async in that case, those limitations being overcome that document mentions, is that it requires a lot less overheads per async task than real threads. It saves all the work of constantly rescheduling tasks and changing context.

As has been said "Synchronous threads are for doing work, asyn tasks are for waiting."

Of course you can mix them up effectively in a program as shown above.

2 Likes

Sorry, the example I posted above uses "select!" you wanted "join!". I meant to post this version:

           let handle1 = tokio::spawn(async move {
                // Some async code here that uses ".await"
                // ...
            });
            let handle2 = tokio::task::spawn_blocking(move || {
                // Some long running synchronous code here
                // inc. calls to non-asyc functions that don't use ".await"
                ...
            });
            tokio::join!(
                handle1,
                handle2
            );

Simce I'm running this within a loop I get some errors - value moved into closure here, in previous iteration of loop for the arguments I'm passing to the model_inference

I typically I deal with 'value moved' errors by taking a ".clone()" of the object in question.

If the object does not support clone it will need wrapping in a reference-counting pointer, std::rc::Rc : Rc in std::rc - Rust

Or likely for a tokio threaded program a mutex:
tokio::sync::Mutex : https://tokio-rs.github.io/tokio/doc/tokio/sync/struct.Mutex.html

Cloning may not be efficient for large structures.

Two of the arguments - graphSeshVecFC and graphSeshVecFS are pretty large (around 200-300MB) so cloning would not be a good option as you mentioned. Mutexes maybe a viable option

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.