Questions about sqlx fetch_many, and transforming streams

So this is my starting point

struct Person {
    id: i32,
    name: String,
    age: i32

pub struct Database {
    pool: Pool<Postgres>,

impl Database {
    pub async fn get_person_stream(&self) -> BoxStream<Result<sqlx::Either<PgQueryResult, Person>, sqlx::Error>> {

        let result = sqlx::query_as!(
        "select * from person")


async fn main() -> anyhow::Result<()> {

    let pool = PgPoolOptions::new()

    let db = Database {

    let mut persons = db.get_person_stream().await;
    while let Some(next) = {
        match next {
            Ok(either) => {
                match either {
                    Either::Left(_) => {
                        dbg!("got left");
                    Either::Right(right) => {
            Err(err) => {


The above works. Now I want to switch from query_as! to query. One request could be because I want to result of the query as a tuple that contains a subset of the columns in the table.

But trying to do this lead to the error:

24 |         result
   |         ^^^^^^ expected `Person`, found `Record`
   = note: expected struct `Pin<Box<dyn Stream<Item = Result<sqlx::Either<PgQueryResult, Person>, sqlx::Error>> + std::marker::Send>>`
              found struct `Pin<Box<dyn Stream<Item = Result<sqlx::Either<PgQueryResult, Record>, sqlx::Error>> + std::marker::Send>>`

Which is to be expected, but the problem now is, I cannot seem to find a way to import Record type.

So the first question:

1 . How do I specify the return type as sqlx Record type?

The next thing I tried next to convert to the subset I want returned. So let's say just the name and age ignoring the Id.

For that I came up with the following code that works

impl Database {
    pub async fn get_person_stream(&self) -> Result<Vec<(std::string::String, i32)>, sqlx::Error> {

        let result = sqlx::query!(
        "select * from person")
            .try_filter_map(|result| async {
                Ok(result.right().map(|record| {
                    (, record.age)


But now I am wondering if this makes sense. If I am already mapping to a Vec, that means there will be allocation, and results would be fed up totally into memory. Does this not defeat the entire purpose of using a streaming api?.

So the next question

  1. How do I register some transformations on the stream but not consume it, ie so that the get_person_stream now returns BoxStream<Result<sqlx::Either<PgQueryResult, Vec<(String, i32)>>, sqlx::Error>>, which enables the code that calls this method to still have a stream to process.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.