Need help to implement return stream of users using Tiberius SQL

I am implementing a user stream function from SQL Server using Tiberius and a connection pool. I'm struggling to adapt sqlx and PostgreSQL example for Tiberius and SQL Server.

Here is a successful example1 using sqlx:

use futures::TryStreamExt;
use futures_core::stream::BoxStream;
use futures_util::StreamExt;
use sqlx::postgres::PgPoolOptions;
use sqlx::{Pool, Postgres, Row};
use std::error::Error;

#[derive(Debug)]
pub struct User {
    name: String,
}

pub struct SomeRepository {
    pool: Pool<Postgres>,
}

impl SomeRepository {
    pub fn new(pool: Pool<Postgres>) -> Self {
        Self { pool }
    }

    pub fn stream_users2(&self) -> BoxStream<'static, Result<User, sqlx::Error>> {
        sqlx::query("SELECT name FROM users")
            .fetch(&self.pool)
            .map(|result| {
                result.map(|row| User {
                    name: row.get("name"),
                })
            })
            .boxed()
    }
}

impl SomeRepository {}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Set up the connection pool
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect("postgres://username:password@localhost/database_name")
        .await?;

    let repo = SomeRepository::new(pool);
    let mut user_stream = repo.stream_users2();
    while let Some(user_result) = user_stream.next().await {
        match user_result {
            Ok(user) => println!("User: {:?}", user.name),
            Err(e) => eprintln!("Error: {:?}", e),
        }
    }

    Ok(())
}

Here is a successful example2 using sqlx

use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use futures_core::stream::BoxStream;
use futures_util::StreamExt;
use std::error::Error;
use tokio_postgres::NoTls;

#[derive(Debug)]
pub struct User {
    name: String,
}

pub struct SomeRepository {
    pool: Pool<PostgresConnectionManager<NoTls>>,
}

impl SomeRepository {
    pub fn new(pool: Pool<PostgresConnectionManager<NoTls>>) -> Self {
        Self { pool }
    }

    pub async fn stream_users(&self) -> BoxStream<'static, Result<User, tokio_postgres::Error>> {
        let conn = self.pool.get().await.unwrap();
        let params: Vec<String> = vec![];
        let stream = conn
            .query_raw("SELECT name FROM users", params)
            .await
            .unwrap();

        Box::pin(stream.map(|result| {
            result.map(|row| User {
                name: row.get("name"),
            })
        }))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let manager = PostgresConnectionManager::new_from_stringlike(
        "host=localhost user=username password=password dbname=database_name",
        NoTls,
    )?;

    let pool = Pool::builder().max_size(5).build(manager).await?;
    let repo = SomeRepository::new(pool);

    let mut user_stream = repo.stream_users().await;
    while let Some(user_result) = user_stream.next().await {
        match user_result {
            Ok(user) => println!("User: {:?}", user.name),
            Err(e) => eprintln!("Error: {:?}", e),
        }
    }

    Ok(())
}

Here is Tiberius, my program won't compile, I don't know how to implement:

use bb8::Pool;
use futures_core::stream::BoxStream;
use std::error::Error;
use tiberius::Client;
use tokio_util::compat::TokioAsyncWriteCompatExt;

#[derive(Debug)]
pub struct User {
    name: String,
}

pub struct SomeRepository {
    pool: DbPool,
}

impl SomeRepository {
    pub fn new(pool: DbPool) -> Self {
        Self { pool }
    }

    pub async fn stream_users(
        &self,
    ) -> Result<BoxStream<'static, Result<User, anyhow::Error>>, anyhow::Error> {
        let mut conn = self.pool.get().await.unwrap();
        let stream = conn.query("select 1", &[]).await?;

        // don't know how to implement in tiberius
        todo!()
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let pool = create_db_pool("my connection stirng").await;
    let repo = SomeRepository::new(pool);

    let mut user_stream = repo.stream_users().await;
    while let Some(user_result) = user_stream.next().await {
        match user_result {
            Ok(user) => println!("User: {:?}", user.name),
            Err(e) => eprintln!("Error: {:?}", e),
        }
    }

    Ok(())
}

pub type DbPool = Pool<TiberiusConnectionManager>;

pub struct TiberiusConnectionManager {
    config: tiberius::Config,
}

impl TiberiusConnectionManager {
    pub fn new(config: tiberius::Config) -> Self {
        Self { config }
    }
}

#[async_trait::async_trait]
impl bb8::ManageConnection for TiberiusConnectionManager {
    type Connection = tiberius::Client<tokio_util::compat::Compat<tokio::net::TcpStream>>;
    type Error = tiberius::error::Error;

    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
        let tcp = tokio::net::TcpStream::connect(self.config.get_addr()).await?;
        tcp.set_nodelay(true)?;
        Client::connect(self.config.clone(), tcp.compat_write()).await
    }

    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
        conn.simple_query("SELECT 1").await.map(|_| ())
    }

    fn has_broken(&self, _: &mut Self::Connection) -> bool {
        false
    }
}

pub async fn create_db_pool(connection_string: &str) -> DbPool {
    let mut config = tiberius::Config::from_ado_string(connection_string).unwrap();
    config.trust_cert();
    let manager = TiberiusConnectionManager::new(config);
    Pool::builder().build(manager).await.unwrap()
}

Here is my Cargo.toml

[package]
name = "rust-test"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1"
async-stream = "0.3.6"
async-trait = "0.1"
axum = "0.7"
bb8 = "0.8"
bb8-postgres = "0.8"
base64 = "0.22"
bb8-tiberius = "0.15"
chrono = "0.4"
futures = "0.3"
futures-core = "0.3"
futures-util = "0.3"
hmac = "0.12"
jsonwebtoken = "9"
pin-utils = "0.1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10"
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres"] }
tiberius = { version = "0.12", default-features = false, features = [
    "rustls",
    "tds73",
    "winauth",
    "chrono",
    "rust_decimal",
] }
tokio = { version = "1.0", features = ["full"] }
tokio-postgres = "0.7"
tokio-util = { version = "0.7", features = ["compat"] }
tracing = "0.1"
tracing-subscriber = "0.3"

Anyone can help? Thanks a lot.

I can implement with print_users, but don't know how to implement stream_users

use futures::StreamExt;

    pub async fn print_users(&self) -> Result<(), anyhow::Error> {
        let mut conn = self.pool.get().await.unwrap();
        let mut stream = conn
            .query("SELECT name FROM users", &[])
            .await?
            .into_row_stream();
        while let Some(row_result) = stream.next().await {
            let row = row_result?;
            let name = row.get::<&str, _>(0).unwrap().to_string();
            println!("{name}");
        }

        Ok(())
    }

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.