Trait sqlx::PgExecutor with sqlx::Pool and sqlx::Transaction

I want to be able to instantiate Db with both sqlx::Pool<sqlx::Postgres> and sqlx::Transaction<'_, sqlx::Postgres>, so that I can run the same queries either directly or multiple within a transaction.

And from my understanding both should implement the trait sqlx::PgExecutor, but when I try to instantiate Db with sqlx::Transaction<'_, sqlx::Postgres> I get the following error:

expected `Pool<Postgres>`, found `Transaction<'_, Postgres>`

Full code:

#[derive(Clone)]
pub struct Db<T>
where
    for<'a> &'a T: sqlx::PgExecutor<'a>,
{
    pub executor: T,
}

impl<T> Db<T>
where
    for<'a> &'a T: sqlx::PgExecutor<'a>,
{
    pub fn new(executor: T) -> Self {
        Self { executor }
    }
}

impl<T> Db<T>
where
    for<'a> &'a T: sqlx::PgExecutor<'a>,
{
    pub async fn get_user(&self) -> Result<i64, sqlx::Error> {
        sqlx::query_scalar(r#"SELECT "id" FROM "user""#)
            .fetch_one(&self.executor)
            .await
    }
}

pub async fn test() {
    let pool = sqlx::postgres::PgPoolOptions::new()
        .max_connections(100)
        .connect("postgres://postgres:postgres@localhost:5432/db")
        .await
        .expect("Could not initialize PgPool");
    //let db = Db::new(pool);
    let tx = match pool.begin().await {
        Ok(value) => value,
        Err(..) => return,
    };
    let db = Db::new(tx);
}

Looks like I figured it out thanks to another post, is this a good solution?

pub trait DbExecutor<'a> {
    type Executor: sqlx::Executor<'a, Database = sqlx::Postgres>;

    fn executor(&'a mut self) -> Self::Executor;
}

#[derive(Clone)]
pub struct Db<T> {
    pub executor: T,
}

impl<'a> DbExecutor<'a> for Db<&sqlx::PgPool> {
    type Executor = &'a sqlx::PgPool;

    fn executor(&'a mut self) -> Self::Executor {
        &self.executor
    }
}

impl<'a> DbExecutor<'a> for Db<&mut sqlx::PgConnection> {
    type Executor = &'a mut sqlx::PgConnection;

    fn executor(&'a mut self) -> Self::Executor {
        &mut self.executor
    }
}

impl<T> Db<T> {
    pub fn new(executor: T) -> Self {
        Self { executor }
    }
}

impl<T> Db<T>
where
    Self: for<'a> DbExecutor<'a>,
{
    pub async fn get_user(&mut self) -> Result<i64, sqlx::Error> {
        sqlx::query_scalar(r#"SELECT "id" FROM "user""#)
            .fetch_one(self.executor())
            .await
    }
}

pub async fn test() {
    let pool = sqlx::postgres::PgPoolOptions::new()
        .max_connections(100)
        .connect("postgres://postgres:postgres@localhost:5432/db")
        .await
        .expect("Could not initialize PgPool");
    //let mut db = Db::new(&pool);
    let mut tx = match pool.begin().await {
        Ok(value) => value,
        Err(..) => return,
    };
    let mut db = Db::new(&mut *tx);
}

I saw this thread earlier, as it's related to a problem I had in the project that motivated the reply you linked to! Thanks for your solution - I like what you've done here and it gives me a couple of ideas.

Made another couple changes that fixed some lifetime problems for me, as well as a helper function to create a transaction from a Db::<sqlx::PgPool>:

#[derive(Clone)]
pub struct Db<T> {
    pub executor: T,
}

impl<T> Db<T> {
    pub fn new(executor: T) -> Self {
        Self { executor }
    }
}

type PgTransaction<'a> = sqlx::Transaction<'a, sqlx::Postgres>;

impl Db<sqlx::PgPool> {
    pub async fn transaction<'a>(self) -> Result<Db<PgTransaction<'a>>, sqlx::Error> {
        let transaction = self.executor.begin().await?;
        Ok(Db::new(transaction))
    }
}

pub trait DbExecutor<'a> {
    type Executor: sqlx::Executor<'a, Database = sqlx::Postgres>;

    fn executor(&'a mut self) -> Self::Executor;
}

impl<'a> DbExecutor<'a> for Db<sqlx::PgPool> {
    type Executor = &'a sqlx::PgPool;

    fn executor(&'a mut self) -> Self::Executor {
        &self.executor
    }
}

impl<'a> DbExecutor<'a> for Db<PgTransaction<'_>> {
    type Executor = &'a mut sqlx::PgConnection;

    fn executor(&'a mut self) -> Self::Executor {
        &mut *self.executor
    }
}

impl<T> Db<T>
where
    Self: for<'a> DbExecutor<'a>,
{
    pub async fn get_user(&mut self) -> Result<i64, sqlx::Error> {
        sqlx::query_scalar(r#"SELECT "id" FROM "user""#)
            .fetch_one(self.executor())
            .await
    }
}

pub async fn test() {
    let pool = sqlx::postgres::PgPoolOptions::new()
        .max_connections(100)
        .connect("postgres://postgres:postgres@localhost:5432/db")
        .await
        .expect("Could not initialize PgPool");
    let db = Db::new(pool);
    let mut db = match db.transaction().await {
        Ok(value) => value,
        Err(..) => return,
    };
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.