Questions about sqlx fetch_many, and transforming streams

So this is my starting point

#[derive(Debug)]
struct Person {
    id: i32,
    name: String,
    age: i32
}

pub struct Database {
    pool: Pool<Postgres>,
}

impl Database {
    pub async fn get_person_stream(&self) -> BoxStream<Result<sqlx::Either<PgQueryResult, Person>, sqlx::Error>> {

        let result = sqlx::query_as!(
        Person,
        "select * from person")
            .fetch_many(&self.pool);

        result
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {

    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect("postgresql://me:password@localhost:5432/learn_sqlx").await?;

    let db = Database {
        pool,
    };

    let mut persons = db.get_person_stream().await;
    while let Some(next) = persons.next().await {
        match next {
            Ok(either) => {
                match either {
                    Either::Left(_) => {
                        dbg!("got left");
                    }
                    Either::Right(right) => {
                        dbg!(&right);
                    }
                }
            }
            Err(err) => {
                dbg!(&err);
            }
        }
    }

    Ok(())
}

The above works. Now I want to switch from query_as! to query. One request could be because I want to result of the query as a tuple that contains a subset of the columns in the table.

But trying to do this lead to the error:

   |
24 |         result
   |         ^^^^^^ expected `Person`, found `Record`
   |
   = note: expected struct `Pin<Box<dyn Stream<Item = Result<sqlx::Either<PgQueryResult, Person>, sqlx::Error>> + std::marker::Send>>`
              found struct `Pin<Box<dyn Stream<Item = Result<sqlx::Either<PgQueryResult, Record>, sqlx::Error>> + std::marker::Send>>`

Which is to be expected, but the problem now is, I cannot seem to find a way to import Record type.

So the first question:

1 . How do I specify the return type as sqlx Record type?

The next thing I tried next to convert to the subset I want returned. So let's say just the name and age ignoring the Id.

For that I came up with the following code that works

impl Database {
    pub async fn get_person_stream(&self) -> Result<Vec<(std::string::String, i32)>, sqlx::Error> {

        let result = sqlx::query!(
        "select * from person")
            .fetch_many(&self.pool)
            .try_filter_map(|result| async {
                Ok(result.right().map(|record| {
                    (record.name, record.age)
                }))
            }).try_collect().await;

        result
    }
}

But now I am wondering if this makes sense. If I am already mapping to a Vec, that means there will be allocation, and results would be fed up totally into memory. Does this not defeat the entire purpose of using a streaming api?.

So the next question

  1. How do I register some transformations on the stream but not consume it, ie so that the get_person_stream now returns BoxStream<Result<sqlx::Either<PgQueryResult, Vec<(String, i32)>>, sqlx::Error>>, which enables the code that calls this method to still have a stream to process.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.