Rusqlite rows into actix_web stream - resolved w async_stream

motivation

Actix-web can take a stream. Rusqlite can make an iterator over query results. Wouldn't it be cool to put those together so you can stream arbitrary numbers of records from a single query?

I'm wanting to do this to syncronize two databases, so it could be a big query.

the problem

In actix, HttpResponse::ok().streaming(my_records_stream) requires lifetime of 'static for my_records_stream.

Unfortunately in rusqlite a query returns Rows<'a>, which borrows from Statement<'a>, which in turn borrows from Connection.

current failed solution

After quite a bit of messing around here's my latest non-working solution, using the ouroboros library:

#[self_referencing]
pub struct ZkNoteStream {
  conn: Connection,
  #[borrows(conn)]
  pstmt: rusqlite::Statement<'this>,
  #[borrows(mut pstmt)]
  #[covariant]
  rows: rusqlite::Rows<'this>,
}

impl ZkNoteStream {
  pub fn init(conn: Connection, user: i64, search: &ZkNoteSearch) -> Result<Self, Box<dyn Error>> {
    let (sql, args) = build_sql(&conn, user, search.clone())?;

    Ok(
      ZkNoteStreamTryBuilder {
        conn: conn,
        pstmt_builder: |conn: &Connection| conn.prepare(sql.as_str()),
        rows_builder: |pstmt: &mut rusqlite::Statement<'_>| {
          pstmt.query(rusqlite::params_from_iter(args.iter()))
        },
      }
      .try_build()?,
    )
  }
}

Just trying to build the data structure; if this worked I'd add the impl for the Bytes stream actix requires.

However, it doesn't work! The failure:

error[E0597]: `pstmt` does not live long enough
   --> server-lib/src/search.rs:880:1
    |
880 | #[self_referencing]
    | ^^^^^^^^^^^^^^^^^^-
    | |                 |
    | |                 `pstmt` dropped here while still borrowed
    | |                 borrow might be used here, when `pstmt` is dropped and runs the `Drop` code for type `Statement`
    | borrowed value does not live long enough
    |
    = note: this error originates in the attribute macro `self_referencing` (in Nightly builds, run with -Z macro-backtrace for more info)

help!

Help addressing this specific error would be great.

I'm also open to other solutions to this problem. I've considered just using offset and limit in my sql to page results across multiple queries. But that means query results are not a point in time, so records can be missed or duplicated if the tables change between queries. That's my fallback though.

This async_stream seems promising - you can do a computation in async and return a series of values with yield. Perfect. But, it requires a pin_mut!(stream) outside of the stream itself. So the stream isn't self contained, ie not 'static, so it can't be passed to actix.

Don't use ouroboros, it's unsound. (To my knowledge, all of the popular self-referential libraries are, too, so don't use anything else, either.)


I can't make your code compile, because it's missing way more detail than I'm willing to figure out/surmise, but this is the core idea. TL;DR: just don't be self-referential.

I'm totally down with not being self referential. But what other solution is there in this case?

As demonstrated by my playground above, you basically have to kick the bucket down the road and work with/store references to the connection. Then propagate this requirement (ie., the need for a valid reference) up the chain of users. Eventually, you'll reach a level where the connection can be stored by-value for long enough that all of its users will be satisfied with the lifetime of a reference to it.

I would prefer that, but the problem is that this all takes place in an actix handler, which returns an HttpResponse. I can pass the stream into that with HttpResponse::Ok().stream(my_stream), but then the stored references that were used to make the stream die at the end of the handler function.

No, you only need to use pin_mut! if you want to iterate over the stream by calling next on it.


Can you rewrite your code into something like this to make it work? Basically what I'm trying to show you is that when you wrap your Connection in an Arc and create your query within the stream! macro, you are able to get a stream that satisfies the 'static bound.

3 Likes

OMG! That did it! The async_stream way is super elegant. Just have to convert the records to Bytes:

pub fn zkn_stream(conn: Arc<Connection>, user: i64, search: ZkNoteSearch) 
-> impl Stream<Item = Result<Bytes, Box<dyn std::error::Error>>> + 'static {
  try_stream! {
    let (sql, args) = build_sql(&conn, user, search.clone())?;
    let mut stmt = conn.prepare(sql.as_str())?;
    let mut rows = stmt.query(rusqlite::params_from_iter(args.iter()))?;

    while let Some(row) = rows.next()? {
     let zln = ZkListNote {
       id: row.get(0)?,
       title: row.get(1)?,
       is_file: {
         let wat: Option<i64> = row.get(2)?;
         wat.is_some()
       },
       user: row.get(3)?,
       createdate: row.get(4)?,
       changeddate: row.get(5)?,
       sysids: Vec::new(),
     };

     yield Bytes::from(serde_json::to_value(zln)?.to_string())
    }
  }
}

And in the actix handler it looks like this:

      let search: ZkNoteSearch = serde_json::from_value(msgdata.clone())?;
      let conn = Arc::new(sqldata::connection_open(
        config.orgauth_config.db.as_path(),
      )?);
      let znsstream = search::zkn_stream(conn, uid, search);
      Ok(HttpResponse::Ok().streaming(znsstream))

So that compiles and runs now! Haven't tested on thousands of records yet, but hopefully should be good for that.

1 Like

Probably not. Rusqlite is blocking and you're going to spend a ton of time in your while let Some(row) = rows.next()? { ... } loop, without ever hitting an await point. Your async runtime hates that. So the next step would be to move that part of your code into a blocking task and have that task yield rows via messages to your stream. Here another example. Note that Connection is not thread-safe, so I just created the Connection within that task, rather than passing it in from the outside. I think this would be the right way to do it to begin with, as Connection is not a connection pool but represents a single connection.

1 Like

As is, it streams 10232 records (6492096 bytes) from a localhost server in about 2.5 seconds. Not bad!

Thx for the tip re keeping the runtime happy. I'll take a look at that as I finish this part of the app. Not crucial right now as there aren't any users yet besides me, but I do want to get this technique sorted for higher performance situations.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.