What I need in a new project is:
-
Start the project with SQLite or with Postgresql based on a CLI argument
-
Have a "clean" architecture to be able to scale (the project is really big in terms of the number of methods, but they are all almost the same: HTTP req -> read from DB -> some small CPU work -> write to DB -> HTTP response)
-
Use the full power of the async runtime (tokio) so use the correct Send + Sync to be able to use threads and green threads correctly
-
Be able to call Service AND Repository methods both with and without the same DB transaction
-
Be able to call other services methods with that same DB transaction
-
The DB transaction can
begin()
in a Service method or in a Repository method (there is no rule here) -
The rollback of the transaction is automatic if the
commit()
method is not called (because of an error in the middle of the work)
Note: I know that DDD or even the clean architecture suggests against calling other services within the same transaction, but I need to and the methods are all fast and in the same executable (for now).
I think for these requirements I need to create an abstract DB interface and I tried in the below code.
As you can see the errors are all for the DB abstract interface I'm trying to create: I don't know how!
Can you help me?
/*
[dependencies]
async-trait = { version = "0.1.81", default-features = false }
sqlx = { version = "0.7.4", default-features = false, features = [
"macros",
"postgres",
"sqlite",
"runtime-tokio",
] }
tokio = { version = "1.38.0", default-features = false, features = [
"signal",
"macros",
"process",
"rt-multi-thread",
] }
*/
use async_trait::async_trait;
use sqlx::{Error, Executor, PgPool, Postgres, Sqlite, SqlitePool, Transaction};
#[async_trait]
pub trait Database: Send + Sync {
type Connection: Executor<'_> + Send;
type Transaction<'a>: Executor<'a, Database = Self::Connection> + Send + 'a
where
Self: 'a;
async fn begin(&self) -> Result<Self::Transaction<'_>, Error>;
async fn commit(&self) -> Result<Self::Transaction<'_>, Error>;
}
pub struct PgDatabase {
pool: PgPool,
}
impl PgDatabase {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl Database for PgDatabase {
type Connection = PgPool;
type Transaction<'a> = Transaction<'a, Postgres>;
async fn begin(&self) -> Result<Self::Transaction<'_>, Error> {
self.pool.begin().await
}
async fn commit(&self) -> Result<(), Error> {
self.transaction.commit().await
}
}
pub struct SqliteDatabase {
pool: SqlitePool,
}
impl SqliteDatabase {
pub fn new(pool: SqlitePool) -> Self {
Self { pool }
}
}
#[async_trait]
impl Database for SqliteDatabase {
type Connection = SqlitePool;
type Transaction<'a> = Transaction<'a, Sqlite>;
async fn begin(&self) -> Result<Self::Transaction<'_>, Error> {
self.pool.begin().await
}
async fn commit(&self) -> Result<(), Error> {
self.transaction.commit().await
}
}
#[derive(sqlx::FromRow, Debug, Clone)]
pub struct Namespace {
pub id: String,
pub membership: String,
}
pub struct Repository<DB: Database> {
db: DB,
}
impl<DB: Database> Repository<DB> {
pub fn new(db: DB) -> Self {
Self { db }
}
pub async fn find_namespace_by_id<'a, E>(
&self,
executor: Option<E>,
namespace_id: &str,
) -> Result<Namespace, Error>
where
E: Executor<'a, Database = sqlx::Any> + Send,
{
let db = match executor {
Some(db) => executor,
None => self.db.begin().await?,
};
let query = "SELECT * FROM kernel_namespaces WHERE id = $1";
let namespace = sqlx::query_as::<_, Namespace>(query)
.bind(namespace_id)
.fetch_optional(executor)
.await?
.ok_or(Error::RowNotFound)?;
Ok(namespace)
}
}
pub struct Service<DB: Database> {
repo: Repository<DB>,
db: DB,
other_service: OtherService<DB>,
}
impl<DB: Database> Service<DB> {
pub fn new(repo: Repository<DB>, db: DB, other_service: OtherService<DB>) -> Self {
Self {
repo,
db,
other_service,
}
}
pub async fn find_namespace_and_membership(&self, namespace_id: &str) -> Result<String, Error> {
let mut tx = self.db.begin().await?;
let namespace = self
.repo
.find_namespace_by_id(Some(tx), namespace_id)
.await?;
// Perform other operations within the transaction
tx.commit().await?;
Ok(namespace.membership)
}
pub async fn other_service_method(
&self,
executor: Option<E>,
namespace_id: &str,
) -> Result<(), Error>
where
E: Executor<'a, Database = sqlx::Any> + Send,
{
let db = match executor {
Some(db) => executor,
None => self.db.begin(),
};
let _ = self.repo.find_namespace_by_id(db, namespace_id).await?;
// Perform other operations
Ok(())
}
}
pub struct OtherService<DB: Database> {
repo: Repository<DB>,
db: DB,
}
impl<DB: Database> OtherService<DB> {
pub fn new(repo: Repository<DB>, db: DB) -> Self {
Self { repo, db }
}
pub async fn get_beautiful_things(
&self,
executor: Option<E>,
namespace_id: &str,
) -> Result<(), Error>
where
E: Executor<'a, Database = sqlx::Any> + Send,
{
let db = match executor {
Some(db) => executor,
None => self.db.begin(),
};
let _ = self.repo.find_namespace_by_id(db, namespace_id).await?;
// Perform other operations
Ok(())
}
}
#[tokio::main]
async fn main() {
let pg_pool = PgPool::connect("postgres://user:password@localhost/db")
.await
.unwrap();
let db = PgDatabase::new(pg_pool);
let repo = Repository::new(db);
let other_service = OtherService::new(repo, db);
let service = Service::new(repo, db, other_service);
match service.find_namespace_and_membership("namespace_id").await {
Ok(membership) => println!("Membership: {}", membership),
Err(e) => eprintln!("Error: {}", e),
}
match service.other_service_method(None, "namespace_id").await {
Ok(_) => println!("Success"),
Err(e) => eprintln!("Error: {}", e),
}
other_service
.get_beautiful_things(None, "namespace_id")
.await
.unwrap();
}