Actix/Sqlx await without block_on for chaining

Hi there,

I'm facing an issue with my code in actix+sqlx, I feel it's not performant at all as I have to use block_on but required in my code.

Here is an example:

pub async fn get_my_children_with_toys(current_user: AuthenticatedUser, db_pool: web::Data<PgPool>) -> HttpResponse {
  match UserLight::get_my_children(current_user.user_id, &db_pool)
    .await
    .map(|api_response| {
      let content: Vec<FullChildUser> =
        api_response.content
          .into_iter()
          .map(|child| {
            let child_id = child.id;
            ChildrenWithToys {
              child,
              toys: tokio::runtime::Runtime::new().unwrap().block_on(
                     Toys::get_all(child_id, &db_pool)
                  ).unwrap_or(ApiResponse::no_content()) // renvoie Toys[]
            }
          }).collect();
      return ApiResponse::build(content);
    }) {
    Ok(response) => HttpResponse::Ok().json(response),
    Err(error) => HttpResponse::build(error.status).json(error)
  }

In my example I have a first feature on which I can use await but later on when I retrieve the list of child (vector), I want to iter over it to fetch toys from DB still await cannot work as I'm not in an async bloc (didn't find a way to write a functional async block with everything).

This is where I find it not ellegant at all and maybe not efficient either. I only found the block_on method to being able to write something that compile.

NB: My real code is something similar with in fact 3 futures on which I tried a join3 but with a block_on too.

Is there any solution to avoid writing a block like that? Should I use tokio or something else?

Maybe I could return a feature instead of a simple HttpResponse but I didn't succeeded in finding something that compile (actix documentation and some tries on my side didn't pay)

Thanks in advance, feel free to provide any feedback as I'm a beginner in Rust.

How about this?

pub async fn get_my_children_with_toys(
    current_user: AuthenticatedUser,
    db_pool: web::Data<PgPool>,
) -> HttpResponse {
    match UserLight::get_my_children(current_user.user_id, &db_pool).await {
        Ok(api_response) => {
            let content: Vec<FullChildUser> = api_response
                .content
                .into_iter()
                .map(|child| {
                    let child_id = child.id;
                    ChildrenWithToys {
                        child,
                        toys: Toys::get_all(child_id, &db_pool)
                            .await
                            .unwrap_or(ApiResponse::no_content()),
                    }
                })
                .collect();
            
            HttpResponse::Ok().json(ApiResponse::build(content))
        },
        Err(error) => HttpResponse::build(error.status).json(error),
    }
}

A stream is another option that allows one to use loops instead of closures.

pub async fn get_my_children_with_toys(current_user: AuthenticatedUser, db_pool: web::Data<PgPool>) -> HttpResponse {
    match UserLight::get_my_children(current_user.user_id, &db_pool).await {
        Ok(api_response) => {
            let content: Vec<ChildrenWithToys> = Vec::new();
            for child in api_response.content.into_iter() {
                content.push(
                    ChildrenWithToys {
                        child,
                        toys: {
                            let toys = Vec::new();
                            let toy_stream =
                                sqlx::query_as!(Toy, "SELECT * FROM toys WHERE child_id = $1")
                                .bind(child.id)
                                .fetch(&db_pool);
                            while let Some(Ok(toy)) =
                                toy_stream
                                .next()
                                .await
                            {
                                toys.push(toy);
                            }
                            toys
                        }
                    }
                );
            }
            HttpResponse::Ok().json(ApiResponse::build(content))
        },
        Err(error) => HttpResponse::build(error.status).json(error),
    }
}

Thanks for your idea Alice, that was 100% what I wrote at the very beginning.

Unfortunately it's not possible, it seems that as soon as you are using a ".map" it creates a crew "block" So .map(|child| { has to become something like .map(|child| async {
...
And it breaks totally the code.

Thanks that works rich-murphey, like I said to Alice as soon as I discovered the real issue of map async I thought standard loop was my only hope too.

Thank you both, it's working fine now, performance is really achieved.

Anyway I'm a little sad as I thought I could produce a 100% functionnal code with Rust but it seems that I was going against the langage (on that example) and not with my bad!

Why is what I wrote not possible? My suggestion is to not use map.

It's that part that doesn't work, cannot mix streaming map and async in a possible way (with my current knowledge obviously).

It is possible to do it with streams. You do tokio::stream::Iter(content.into_iter()) to obtain a stream. Then use .then instead of .map after importing futures::stream::StreamExt.

Sorry I don't know how to write what you suggested.

When using the stream crate it gave something like that:
tokio::stream::iter(api_response.content) iof type
tokio::stream::Iter<std::vec::IntoIter<UserLight>>

NB: content is of type content: Vec<T>

Therefore the .then() is not available, did I miss something?

The Iter type implements Stream, and then is available on any Stream whenever StreamExt is imported.

Ok done, I had conflict with another futures import (utils I think).

let content = tokio::stream::iter(api_response.content)
.then(|child| async {
... // some other call to async method + await.
}).collect::<Vec>().await; // Lost some time here as it doesn't infer type.

So yes with tokio stream I was able to add to convert map to "then ... async".

I can also confirm that there isn't any performance drawback by using the stream like that, at least nothing noticable with a constant peek of 300 simultaneous users.

Thanks a lot Alice I learned something.

Yes, Alice's solution is far simpler.
The need for "use futures::stream::StreamExt" often isn't obvious just from looking at sample code.
Glad you found what you were looking for!