So this is my starting point
#[derive(Debug)]
struct Person {
id: i32,
name: String,
age: i32
}
pub struct Database {
pool: Pool<Postgres>,
}
impl Database {
pub async fn get_person_stream(&self) -> BoxStream<Result<sqlx::Either<PgQueryResult, Person>, sqlx::Error>> {
let result = sqlx::query_as!(
Person,
"select * from person")
.fetch_many(&self.pool);
result
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect("postgresql://me:password@localhost:5432/learn_sqlx").await?;
let db = Database {
pool,
};
let mut persons = db.get_person_stream().await;
while let Some(next) = persons.next().await {
match next {
Ok(either) => {
match either {
Either::Left(_) => {
dbg!("got left");
}
Either::Right(right) => {
dbg!(&right);
}
}
}
Err(err) => {
dbg!(&err);
}
}
}
Ok(())
}
The above works. Now I want to switch from query_as!
to query
. One request could be because I want to result of the query as a tuple that contains a subset of the columns in the table.
But trying to do this lead to the error:
|
24 | result
| ^^^^^^ expected `Person`, found `Record`
|
= note: expected struct `Pin<Box<dyn Stream<Item = Result<sqlx::Either<PgQueryResult, Person>, sqlx::Error>> + std::marker::Send>>`
found struct `Pin<Box<dyn Stream<Item = Result<sqlx::Either<PgQueryResult, Record>, sqlx::Error>> + std::marker::Send>>`
Which is to be expected, but the problem now is, I cannot seem to find a way to import Record
type.
So the first question:
1 . How do I specify the return type as sqlx Record type?
The next thing I tried next to convert to the subset I want returned. So let's say just the name and age ignoring the Id.
For that I came up with the following code that works
impl Database {
pub async fn get_person_stream(&self) -> Result<Vec<(std::string::String, i32)>, sqlx::Error> {
let result = sqlx::query!(
"select * from person")
.fetch_many(&self.pool)
.try_filter_map(|result| async {
Ok(result.right().map(|record| {
(record.name, record.age)
}))
}).try_collect().await;
result
}
}
But now I am wondering if this makes sense. If I am already mapping to a Vec
, that means there will be allocation, and results would be fed up totally into memory. Does this not defeat the entire purpose of using a streaming api?.
So the next question
- How do I register some transformations on the stream but not consume it, ie so that the
get_person_stream
now returnsBoxStream<Result<sqlx::Either<PgQueryResult, Vec<(String, i32)>>, sqlx::Error>>
, which enables the code that calls this method to still have a stream to process.