How to pass sqlx::Executor to nested futures

I'm trying to make functions work with both PgPool and PgTransaction. The PgExecutor seems to be meant for this. But I can't understand how to pass it around. It's implemented for PgPool, which is Clone and PgConnection which isn't, so I can't just add the Clone bound.

Here's a runnable example:

use sqlx::{PgExecutor, PgPool};

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres:///").await?;

    let mut tx = pool.begin().await?;
    outer(&mut *tx).await?;
    tx.commit().await
}

async fn outer(db: impl PgExecutor<'_>) -> sqlx::Result<()> {
    dbg!(inner(db, "first").await?);

    // The second invocation doesn't compile:
    // use of moved value: `db`  value used here after move
    dbg!(inner(db, "second").await?);

    Ok(())
}

async fn inner(db: impl PgExecutor<'_>, name: &str) -> sqlx::Result<String> {
    sqlx::query_scalar!(r#"SELECT $1 as "name!""#, name)
        .fetch_one(db)
        .await
}

The question is also posted on StackOverflow

Please add links to cross posts you made on other sites to avoid duplicated effort by the community.

Reborrowing from the &mut PgConnection you pass to outer compiles:

use sqlx::{PgConnection, PgExecutor, PgPool};

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres:///").await?;

    let mut tx = pool.begin().await?;
    outer(&mut *tx).await?;
    tx.commit().await
}

async fn outer(db: &mut PgConnection) -> sqlx::Result<()> {
    dbg!(inner(&mut *db, "first").await?);

    // The second invocation doesn't compile:
    // use of moved value: `db`  value used here after move
    dbg!(inner(&mut *db, "second").await?);

    Ok(())
}

async fn inner(db: impl PgExecutor<'_>, name: &str) -> sqlx::Result<String> {
    sqlx::query_scalar!(r#"SELECT $1 as "name!""#, name)
        .fetch_one(db)
        .await
}

Just note that with this solution using mutable references, you have to abstract your tasks and transactions in such a way that one task does n transactions. You can never have 1 transaction span n tasks due to the 'static bound on the task. Cloning Pools and passing a clone to each task is no problem though, it's actually the intended use-case of it.

I'm trying to make both outer and inner functions compatible with PgConnections and PgPool.

You can use Acquire for this:

use sqlx::{Acquire, PgExecutor, PgPool, Postgres};

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPool::connect("postgres:///").await?;

    let mut tx = pool.begin().await?;
    outer(&mut *tx).await?;
    tx.commit().await
}

async fn outer(db: impl Acquire<'_, Database = Postgres>) -> sqlx::Result<()> {
    let mut connection = db.acquire().await?;

    dbg!(inner(&mut *connection, "first").await?);

    // The second invocation doesn't compile:
    // use of moved value: `db`  value used here after move
    dbg!(inner(&mut *connection, "second").await?);

    Ok(())
}

async fn inner(db: impl PgExecutor<'_>, name: &str) -> sqlx::Result<String> {
    sqlx::query_scalar!(r#"SELECT $1 as "name!""#, name)
        .fetch_one(db)
        .await
}
1 Like

Thanks a lot, exactly what I needed :pray:

1 Like