How do I share a database transaction across trait methods?

I'm using diesel-rs with Postgres as backend, and I want to share a transaction between different trait methods.

The idea is, I have a trait called Repo that is an abstraction over database query and result with one method

#[async_trait]
trait Repo<Q> for Postgres {
    type Response;

    async fn lookup(&mut self, query: Q) -> Result<Self::Response, RepoError>;
}

So the database struct can only be given some query Q if it implements Repo<Q>. This prevents caller from calling unplanned arbitrary queries. It also help me in testing since I can swap the backend to a mock by simply implementing required trait and swapping the object.

The problem is - I can use transaction inside lookup's body

let query_result = self.connection.get()?.transaction(|conn| {
    let record = /* db queries */;

    Ok::<_, RepoError>(record)
})?;

Ok(query_result)

but I'm not sure how I would span this (|conn| { ... }) across two or more Repo implementations.

I'm thinking of something like this

let TrQuery { tr, query } = get_user.lookup(TrQuery::new(tr, user_id)).await?;
...
let TrQuery { tr, query } = get_admin.lookup(TrQuery::new(tr, admin_id)).await?;
...
let TrQuery { tr, query } = get_book.lookup(TrQuery::new(tr, book_id)).await?;

where TrQuery is

pub(crate) struct TrQuery<'qt, Query> {
    pub(crate) tr: &'qt mut PooledConnection<ConnectionManager<PgConnection>>,
    pub(crate) query: Query,
}

this way I can pass transaction and receive it through the result and pass it to the next call.
That is - both Q and Self::Response from Repo trait will be wrapped in TrQuery.


So I thought maybe writing it like this

impl<'qt> Repo<TrQuery<'qt, Q>> for Postgres { ... }

would work, but half-way through the refactor I've had already dealt with a ton of lifetime wrangling which makes me think, perhaps I'm going about it the wrong way. The application is also multi-threaded so I have to be more careful.

What would be a better way of sharing transaction between these Repo methods?

Maybe create a separate trait to be implemented only on an object holding a transaction?

let during_transaction = RepoWithTransaction(db.transaction());
during_transaction.do_this();
during_transaction.do_that();

It was my prior approach and what TrQuery is based on.

I don't quite remember what the final version was, but it looked something like this

struct RepoWithTransaction<'qt, T, Q> where T: Repo<TrQuery<Q>> {
    pub(crate) tr: &'qt mut PooledConnection<ConnectionManager<PgConnection>>,
    pub(crate) interface: T,
    pub(crate) _m: PhantomData<Q>,
}

The main problem was that tr didn't implement Clone which was necessary since lookup method returns a future and the transaction's lifetime might not have lived long enough, or at least till the future resolved.

Essentially I needed to copy/clone tr into a de-sugared async move {} block.

Which led me to this

struct RepoWithTransaction<T, Q> where T: Repo<TrQuery<Q>> {
    pub(crate) pool: Pool<ConnectionManager<PgConnection>>,
    pub(crate) interface: T,
    pub(crate) _m: PhantomData<Q>,
}

Pool does implement Clone and it also resolves the lifetime issue since I can clone the pool and initiate a transaction inside this async move {} block, but this means I have to call pool.get()?.transaction(...) and pass this ref to interface: T (since I need to chain one transaction across lookup calls), which requires lifetime annotation here T: Repo<TrQuery<Q>>, which means it also needs to return the ref because I can't copy the ref across calls (sequence matters), which means I'm back to this pattern

let TrQuery { tr, query } = get_book.lookup(TrQuery::new(tr, book_id)).await?; :sweat_smile:

It might work eventually if I could resolve all the lifetime via refactoring so there's still a chance, hoping in the meantime for an easier solution :crossed_fingers:

I don't quite understand the issue with the lifetimes that you have. The Transaction will inherently be short-lived, and have a lifetime dependent on the connection it is for.

It's okay to make short-lived futures. The only thing you can't do with them is spawn in tokio, but futures are usable without that. Just await them, don't spawn them. There are combinators like join_all that drive multiple lifetime-bound short-lived futures in parallel without need for any cloning or 'static.

I think I overestimated the lifetime troubles, seems like it wasn't an actual issue, this is what I've got to after refactoring

type ArcPgTransaction<'rt> = Arc<&'rt mut PooledConnection<ConnectionManager<PgConnection>>>;

pub(crate) struct RepoTransaction<'rt, I> {
    pub(crate) transaction: ArcPgTransaction<'rt>,
    pub(crate) interface: I,
}

impl<'rt, I, Q> Repo<Q> for RepoTransaction<'rt, I>
where
    Q: Send,
    I: Repo<TrQuery<'rt, Q>> + Clone + Send,
{
    type Response = I::Response;

    async fn lookup(&mut self, query: Q) -> Result<S::Response, RepoError> {
        let transaction = self.transaction.clone();

        self.interface.lookup(TrQuery::new(transaction, query)).await
    }
}

I was able to Clone the transaction by simply putting it in an Arc. Now it isn't moved out of the struct. Now I can just add all the interfaces to this struct or layer them together since as long as I implements Repo everything will work just fine.

Now I'm getting a new error (progress!)

future cannot be sent between threads safely 
within `&mut PooledConnection<ConnectionManager<PgConnection>>`,
the trait `Sync` is not implemented for `NonNull<pq_sys::pg_conn>`

which is by design can't send PooledConnection reference · Issue #2232 · diesel-rs/diesel · GitHub

One solution that could work would be to refactor lookup to produce a concrete future and remove the async requirements, do all the database stuff in the method body like any regular function and return the result in an async move block. The query result does implements Send + Sync. I'll probably need to write a custom future impl tho.

I'm surprised that Arc<&mut> works for you. This won't let you call methods that take &mut self. If you only need to call &self methods, then you don't need Arc, and &PooledConnection will do.

Lack of Send on the transaction may be a bigger annoyance. How did it work for you previously? It's possible to use non-Send types in Send futures, but only if you don't keep them across .await points. Did you use a single-threaded executor for the transactions, or are transaction methods blocking?

I'm assuming the Arc here / in its struct?

Right now the function looks like this

fn lookup(&mut self, query: Q) -> S::Future {
   let transaction = self.transaction.clone();

   self.interface.lookup(TrQuery::new(transaction, query))
}

where interface is also a Repo impl on a Postgres struct.

I'm providing transaction to get_result method like so

...
...
let TrQuery {
    transaction,
    query,
} = query;

let record = match query {
    Id::Unique({ ... }) => user::table
        .filter(...)
        .select(User::as_select())
        .get_result(Arc::into_inner(transaction).unwrap()), // `unwrap` is temp
};

async move { Ok(record?) }.boxed() // from `futures` lib

The problem was, since

self.interface.lookup(TrQuery::new(self.transaction, query))

returns a future, the lifetime of both query and transaction needs to be as long if not longer.

query is moved but self.transaction is bound to self, I'll have to make sure self also has rt lifetime which I haven't been able to figure out quite yet without having to refactor the trait.

Putting it in Arc was the simplest albeit not so performant solution. It makes sure transaction doesn't get dropped till the future resolves. I was also planning on adding a couple more interface to the struct, tho I'll have be careful about the exact sequence of the transaction.

And it is! Although I was able to avoid Send requirement by refactoring the query part into a non-async function, and only "send" the actual database response to an async block.

But this is far from ideal. Before this I had entire query in an async fn, which avoids calling and loading data into memory until strictly necessary.

The transaction is indeed kept across await points unfortunately, sharing it across db interfaces necessitates it.

To share a transaction across different methods of your Repo trait, you can use a higher-level construct like an async block or an async function that encapsulates the entire transaction logic while lead data enrichment. Here's an example:

#[async_trait]
trait Repo<Q>: Sized {
    type Response;

    async fn lookup(&mut self, query: Q) -> Result<Self::Response, RepoError>;

    // Additional trait method for multi-step transactions
    async fn multi_step_transaction<'qt, F, T>(&'qt mut self, func: F) -> Result<T, RepoError>
    where
        F: for<'a> FnOnce(&'a mut PooledConnection<ConnectionManager<PgConnection>>) -> Pin<Box<dyn Future<Output = Result<T, RepoError>> + Send + 'a>>;
}

#[async_trait]
impl<'qt, Q> Repo<TrQuery<'qt, Q>> for Postgres {
    type Response = Q::Response;

    async fn lookup(&mut self, query: TrQuery<'qt, Q>) -> Result<Self::Response, RepoError> {
        self.multi_step_transaction(|conn| {
            let record = /* perform db queries with conn */;

            Box::pin(async move { Ok(record) })
        })
        .await
    }

    async fn multi_step_transaction<'a, F, T>(&'a mut self, func: F) -> Result<T, RepoError>
    where
        F: for<'b> FnOnce(&'b mut PooledConnection<ConnectionManager<PgConnection>>) -> Pin<Box<dyn Future<Output = Result<T, RepoError>> + Send + 'b>>,
    {
        // Start a transaction
        let tr = self.connection.get()?.begin().await?;

        // Call the provided function with the transaction
        let result = func(&mut tr).await;

        // Commit or rollback based on the result
        if result.is_ok() {
            tr.commit().await?;
        } else {
            tr.rollback().await?;
        }

        result
    }
}