Is this possible?

What I need in a new project is:

  1. Start the project with SQLite or with Postgresql based on a CLI argument

  2. Have a "clean" architecture to be able to scale (the project is really big in terms of the number of methods, but they are all almost the same: HTTP req -> read from DB -> some small CPU work -> write to DB -> HTTP response)

  3. Use the full power of the async runtime (tokio) so use the correct Send + Sync to be able to use threads and green threads correctly

  4. Be able to call Service AND Repository methods both with and without the same DB transaction

  5. Be able to call other services methods with that same DB transaction

  6. The DB transaction can begin() in a Service method or in a Repository method (there is no rule here)

  7. The rollback of the transaction is automatic if the commit() method is not called (because of an error in the middle of the work)

Note: I know that DDD or even the clean architecture suggests against calling other services within the same transaction, but I need to and the methods are all fast and in the same executable (for now).

I think for these requirements I need to create an abstract DB interface and I tried in the below code.

As you can see the errors are all for the DB abstract interface I'm trying to create: I don't know how!

Can you help me?

Code in Rust Playground

/*
[dependencies]
async-trait = { version = "0.1.81", default-features = false }
sqlx = { version = "0.7.4", default-features = false, features = [
    "macros",
    "postgres",
    "sqlite",
    "runtime-tokio",
] }
tokio = { version = "1.38.0", default-features = false, features = [
    "signal",
    "macros",
    "process",
    "rt-multi-thread",
] }
*/

use async_trait::async_trait;
use sqlx::{Error, Executor, PgPool, Postgres, Sqlite, SqlitePool, Transaction};

#[async_trait]
pub trait Database: Send + Sync {
    type Connection: Executor<'_> + Send;
    type Transaction<'a>: Executor<'a, Database = Self::Connection> + Send + 'a
    where
        Self: 'a;

    async fn begin(&self) -> Result<Self::Transaction<'_>, Error>;
    async fn commit(&self) -> Result<Self::Transaction<'_>, Error>;
}

pub struct PgDatabase {
    pool: PgPool,
}

impl PgDatabase {
    pub fn new(pool: PgPool) -> Self {
        Self { pool }
    }
}

#[async_trait]
impl Database for PgDatabase {
    type Connection = PgPool;
    type Transaction<'a> = Transaction<'a, Postgres>;

    async fn begin(&self) -> Result<Self::Transaction<'_>, Error> {
        self.pool.begin().await
    }

    async fn commit(&self) -> Result<(), Error> {
        self.transaction.commit().await
    }
}

pub struct SqliteDatabase {
    pool: SqlitePool,
}

impl SqliteDatabase {
    pub fn new(pool: SqlitePool) -> Self {
        Self { pool }
    }
}

#[async_trait]
impl Database for SqliteDatabase {
    type Connection = SqlitePool;
    type Transaction<'a> = Transaction<'a, Sqlite>;

    async fn begin(&self) -> Result<Self::Transaction<'_>, Error> {
        self.pool.begin().await
    }

    async fn commit(&self) -> Result<(), Error> {
        self.transaction.commit().await
    }
}

#[derive(sqlx::FromRow, Debug, Clone)]
pub struct Namespace {
    pub id: String,
    pub membership: String,
}

pub struct Repository<DB: Database> {
    db: DB,
}

impl<DB: Database> Repository<DB> {
    pub fn new(db: DB) -> Self {
        Self { db }
    }

    pub async fn find_namespace_by_id<'a, E>(
        &self,
        executor: Option<E>,
        namespace_id: &str,
    ) -> Result<Namespace, Error>
    where
        E: Executor<'a, Database = sqlx::Any> + Send,
    {
        let db = match executor {
            Some(db) => executor,
            None => self.db.begin().await?,
        };

        let query = "SELECT * FROM kernel_namespaces WHERE id = $1";

        let namespace = sqlx::query_as::<_, Namespace>(query)
            .bind(namespace_id)
            .fetch_optional(executor)
            .await?
            .ok_or(Error::RowNotFound)?;

        Ok(namespace)
    }
}

pub struct Service<DB: Database> {
    repo: Repository<DB>,
    db: DB,
    other_service: OtherService<DB>,
}

impl<DB: Database> Service<DB> {
    pub fn new(repo: Repository<DB>, db: DB, other_service: OtherService<DB>) -> Self {
        Self {
            repo,
            db,
            other_service,
        }
    }

    pub async fn find_namespace_and_membership(&self, namespace_id: &str) -> Result<String, Error> {
        let mut tx = self.db.begin().await?;

        let namespace = self
            .repo
            .find_namespace_by_id(Some(tx), namespace_id)
            .await?;

        // Perform other operations within the transaction

        tx.commit().await?;

        Ok(namespace.membership)
    }

    pub async fn other_service_method(
        &self,
        executor: Option<E>,
        namespace_id: &str,
    ) -> Result<(), Error>
    where
        E: Executor<'a, Database = sqlx::Any> + Send,
    {
        let db = match executor {
            Some(db) => executor,
            None => self.db.begin(),
        };

        let _ = self.repo.find_namespace_by_id(db, namespace_id).await?;

        // Perform other operations

        Ok(())
    }
}

pub struct OtherService<DB: Database> {
    repo: Repository<DB>,
    db: DB,
}

impl<DB: Database> OtherService<DB> {
    pub fn new(repo: Repository<DB>, db: DB) -> Self {
        Self { repo, db }
    }

    pub async fn get_beautiful_things(
        &self,
        executor: Option<E>,
        namespace_id: &str,
    ) -> Result<(), Error>
    where
        E: Executor<'a, Database = sqlx::Any> + Send,
    {
        let db = match executor {
            Some(db) => executor,
            None => self.db.begin(),
        };

        let _ = self.repo.find_namespace_by_id(db, namespace_id).await?;

        // Perform other operations

        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let pg_pool = PgPool::connect("postgres://user:password@localhost/db")
        .await
        .unwrap();
    let db = PgDatabase::new(pg_pool);

    let repo = Repository::new(db);

    let other_service = OtherService::new(repo, db);

    let service = Service::new(repo, db, other_service);

    match service.find_namespace_and_membership("namespace_id").await {
        Ok(membership) => println!("Membership: {}", membership),
        Err(e) => eprintln!("Error: {}", e),
    }

    match service.other_service_method(None, "namespace_id").await {
        Ok(_) => println!("Success"),
        Err(e) => eprintln!("Error: {}", e),
    }

    other_service
        .get_beautiful_things(None, "namespace_id")
        .await
        .unwrap();
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.