Actix/Sqlx await without block_on for chaining

Hi there,

I'm facing an issue with my code in actix+sqlx, I feel it's not performant at all as I have to use block_on but required in my code.

Here is an example:

pub async fn get_my_children_with_toys(current_user: AuthenticatedUser, db_pool: web::Data<PgPool>) -> HttpResponse {
  match UserLight::get_my_children(current_user.user_id, &db_pool)
    .await
    .map(|api_response| {
      let content: Vec<FullChildUser> =
        api_response.content
          .into_iter()
          .map(|child| {
            let child_id = child.id;
            ChildrenWithToys {
              child,
              toys: tokio::runtime::Runtime::new().unwrap().block_on(
                     Toys::get_all(child_id, &db_pool)
                  ).unwrap_or(ApiResponse::no_content()) // renvoie Toys[]
            }
          }).collect();
      return ApiResponse::build(content);
    }) {
    Ok(response) => HttpResponse::Ok().json(response),
    Err(error) => HttpResponse::build(error.status).json(error)
  }

In my example I have a first feature on which I can use await but later on when I retrieve the list of child (vector), I want to iter over it to fetch toys from DB still await cannot work as I'm not in an async bloc (didn't find a way to write a functional async block with everything).

This is where I find it not ellegant at all and maybe not efficient either. I only found the block_on method to being able to write something that compile.

NB: My real code is something similar with in fact 3 futures on which I tried a join3 but with a block_on too.

Is there any solution to avoid writing a block like that? Should I use tokio or something else?

Maybe I could return a feature instead of a simple HttpResponse but I didn't succeeded in finding something that compile (actix documentation and some tries on my side didn't pay)

Thanks in advance, feel free to provide any feedback as I'm a beginner in Rust.

How about this?

pub async fn get_my_children_with_toys(
    current_user: AuthenticatedUser,
    db_pool: web::Data<PgPool>,
) -> HttpResponse {
    match UserLight::get_my_children(current_user.user_id, &db_pool).await {
        Ok(api_response) => {
            let content: Vec<FullChildUser> = api_response
                .content
                .into_iter()
                .map(|child| {
                    let child_id = child.id;
                    ChildrenWithToys {
                        child,
                        toys: Toys::get_all(child_id, &db_pool)
                            .await
                            .unwrap_or(ApiResponse::no_content()),
                    }
                })
                .collect();
            
            HttpResponse::Ok().json(ApiResponse::build(content))
        },
        Err(error) => HttpResponse::build(error.status).json(error),
    }
}

A stream is another option that allows one to use loops instead of closures.

pub async fn get_my_children_with_toys(current_user: AuthenticatedUser, db_pool: web::Data<PgPool>) -> HttpResponse {
    match UserLight::get_my_children(current_user.user_id, &db_pool).await {
        Ok(api_response) => {
            let content: Vec<ChildrenWithToys> = Vec::new();
            for child in api_response.content.into_iter() {
                content.push(
                    ChildrenWithToys {
                        child,
                        toys: {
                            let toys = Vec::new();
                            let toy_stream =
                                sqlx::query_as!(Toy, "SELECT * FROM toys WHERE child_id = $1")
                                .bind(child.id)
                                .fetch(&db_pool);
                            while let Some(Ok(toy)) =
                                toy_stream
                                .next()
                                .await
                            {
                                toys.push(toy);
                            }
                            toys
                        }
                    }
                );
            }
            HttpResponse::Ok().json(ApiResponse::build(content))
        },
        Err(error) => HttpResponse::build(error.status).json(error),
    }
}

Thanks for your idea Alice, that was 100% what I wrote at the very beginning.

Unfortunately it's not possible, it seems that as soon as you are using a ".map" it creates a crew "block" So .map(|child| { has to become something like .map(|child| async {
...
And it breaks totally the code.

Thanks that works rich-murphey, like I said to Alice as soon as I discovered the real issue of map async I thought standard loop was my only hope too.

Thank you both, it's working fine now, performance is really achieved.

Anyway I'm a little sad as I thought I could produce a 100% functionnal code with Rust but it seems that I was going against the langage (on that example) and not with my bad!

Why is what I wrote not possible? My suggestion is to not use map.

It's that part that doesn't work, cannot mix streaming map and async in a possible way (with my current knowledge obviously).

It is possible to do it with streams. You do tokio::stream::Iter(content.into_iter()) to obtain a stream. Then use .then instead of .map after importing futures::stream::StreamExt.

Sorry I don't know how to write what you suggested.

When using the stream crate it gave something like that:
tokio::stream::iter(api_response.content) iof type
tokio::stream::Iter<std::vec::IntoIter<UserLight>>

NB: content is of type content: Vec<T>

Therefore the .then() is not available, did I miss something?

The Iter type implements Stream, and then is available on any Stream whenever StreamExt is imported.

Ok done, I had conflict with another futures import (utils I think).

let content = tokio::stream::iter(api_response.content)
.then(|child| async {
... // some other call to async method + await.
}).collect::<Vec>().await; // Lost some time here as it doesn't infer type.

So yes with tokio stream I was able to add to convert map to "then ... async".

I can also confirm that there isn't any performance drawback by using the stream like that, at least nothing noticable with a constant peek of 300 simultaneous users.

Thanks a lot Alice I learned something.

Yes, Alice's solution is far simpler.
The need for "use futures::stream::StreamExt" often isn't obvious just from looking at sample code.
Glad you found what you were looking for!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.