I am implementing a user stream function from SQL Server using Tiberius and a connection pool. I'm struggling to adapt sqlx and PostgreSQL example for Tiberius and SQL Server.
Here is a successful example1 using sqlx:
use futures::TryStreamExt;
use futures_core::stream::BoxStream;
use futures_util::StreamExt;
use sqlx::postgres::PgPoolOptions;
use sqlx::{Pool, Postgres, Row};
use std::error::Error;
#[derive(Debug)]
pub struct User {
name: String,
}
pub struct SomeRepository {
pool: Pool<Postgres>,
}
impl SomeRepository {
pub fn new(pool: Pool<Postgres>) -> Self {
Self { pool }
}
pub fn stream_users2(&self) -> BoxStream<'static, Result<User, sqlx::Error>> {
sqlx::query("SELECT name FROM users")
.fetch(&self.pool)
.map(|result| {
result.map(|row| User {
name: row.get("name"),
})
})
.boxed()
}
}
impl SomeRepository {}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Set up the connection pool
let pool = PgPoolOptions::new()
.max_connections(5)
.connect("postgres://username:password@localhost/database_name")
.await?;
let repo = SomeRepository::new(pool);
let mut user_stream = repo.stream_users2();
while let Some(user_result) = user_stream.next().await {
match user_result {
Ok(user) => println!("User: {:?}", user.name),
Err(e) => eprintln!("Error: {:?}", e),
}
}
Ok(())
}
Here is a successful example2 using sqlx
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use futures_core::stream::BoxStream;
use futures_util::StreamExt;
use std::error::Error;
use tokio_postgres::NoTls;
#[derive(Debug)]
pub struct User {
name: String,
}
pub struct SomeRepository {
pool: Pool<PostgresConnectionManager<NoTls>>,
}
impl SomeRepository {
pub fn new(pool: Pool<PostgresConnectionManager<NoTls>>) -> Self {
Self { pool }
}
pub async fn stream_users(&self) -> BoxStream<'static, Result<User, tokio_postgres::Error>> {
let conn = self.pool.get().await.unwrap();
let params: Vec<String> = vec![];
let stream = conn
.query_raw("SELECT name FROM users", params)
.await
.unwrap();
Box::pin(stream.map(|result| {
result.map(|row| User {
name: row.get("name"),
})
}))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let manager = PostgresConnectionManager::new_from_stringlike(
"host=localhost user=username password=password dbname=database_name",
NoTls,
)?;
let pool = Pool::builder().max_size(5).build(manager).await?;
let repo = SomeRepository::new(pool);
let mut user_stream = repo.stream_users().await;
while let Some(user_result) = user_stream.next().await {
match user_result {
Ok(user) => println!("User: {:?}", user.name),
Err(e) => eprintln!("Error: {:?}", e),
}
}
Ok(())
}
Here is Tiberius, my program won't compile, I don't know how to implement:
use bb8::Pool;
use futures_core::stream::BoxStream;
use std::error::Error;
use tiberius::Client;
use tokio_util::compat::TokioAsyncWriteCompatExt;
#[derive(Debug)]
pub struct User {
name: String,
}
pub struct SomeRepository {
pool: DbPool,
}
impl SomeRepository {
pub fn new(pool: DbPool) -> Self {
Self { pool }
}
pub async fn stream_users(
&self,
) -> Result<BoxStream<'static, Result<User, anyhow::Error>>, anyhow::Error> {
let mut conn = self.pool.get().await.unwrap();
let stream = conn.query("select 1", &[]).await?;
// don't know how to implement in tiberius
todo!()
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let pool = create_db_pool("my connection stirng").await;
let repo = SomeRepository::new(pool);
let mut user_stream = repo.stream_users().await;
while let Some(user_result) = user_stream.next().await {
match user_result {
Ok(user) => println!("User: {:?}", user.name),
Err(e) => eprintln!("Error: {:?}", e),
}
}
Ok(())
}
pub type DbPool = Pool<TiberiusConnectionManager>;
pub struct TiberiusConnectionManager {
config: tiberius::Config,
}
impl TiberiusConnectionManager {
pub fn new(config: tiberius::Config) -> Self {
Self { config }
}
}
#[async_trait::async_trait]
impl bb8::ManageConnection for TiberiusConnectionManager {
type Connection = tiberius::Client<tokio_util::compat::Compat<tokio::net::TcpStream>>;
type Error = tiberius::error::Error;
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
let tcp = tokio::net::TcpStream::connect(self.config.get_addr()).await?;
tcp.set_nodelay(true)?;
Client::connect(self.config.clone(), tcp.compat_write()).await
}
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
conn.simple_query("SELECT 1").await.map(|_| ())
}
fn has_broken(&self, _: &mut Self::Connection) -> bool {
false
}
}
pub async fn create_db_pool(connection_string: &str) -> DbPool {
let mut config = tiberius::Config::from_ado_string(connection_string).unwrap();
config.trust_cert();
let manager = TiberiusConnectionManager::new(config);
Pool::builder().build(manager).await.unwrap()
}
Here is my Cargo.toml
[package]
name = "rust-test"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1"
async-stream = "0.3.6"
async-trait = "0.1"
axum = "0.7"
bb8 = "0.8"
bb8-postgres = "0.8"
base64 = "0.22"
bb8-tiberius = "0.15"
chrono = "0.4"
futures = "0.3"
futures-core = "0.3"
futures-util = "0.3"
hmac = "0.12"
jsonwebtoken = "9"
pin-utils = "0.1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10"
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres"] }
tiberius = { version = "0.12", default-features = false, features = [
"rustls",
"tds73",
"winauth",
"chrono",
"rust_decimal",
] }
tokio = { version = "1.0", features = ["full"] }
tokio-postgres = "0.7"
tokio-util = { version = "0.7", features = ["compat"] }
tracing = "0.1"
tracing-subscriber = "0.3"
Anyone can help? Thanks a lot.