Stream rows from, database mutate in rust, update row requires multiple mut borrows of tx

Hi folks,

This is essentially what I'm trying to do:

pub async fn migrate_data(tx: &mut Transaction<'_, Postgres>) -> Result {
    let mut rows = sqlx::query!(r#"SELECT id, data FROM table"#)
        .fetch(&mut *tx as &mut PgConnection);

    while let Some(r) = rows.try_next().await? {
        // Read the data in and mutate it.
        let data = mutate_data(r.data);

        // Now store it back out:
        sqlx::query!(
            r#"
                UPDATE table
                SET data = $1
                WHERE id = $2
            "#,
            data,
            r.id
        )
        .execute(&mut *tx as &mut PgConnection) // ERROR: double mut borrow here
        .await?;
    }

    Ok(())
}

I can get around this via paginating on the ID. Fine, no problem there, and maybe in all ways better?

But this seems like a common pattern, so I was wondering how you might solve this?

I'm not familiar with sqlx, but presumably the reason that rows exclusively borrows tx and has an interface with await is that the data is being streamed from the database and the connection can’t be used for other things during that time. (If that’s not in fact the case, then requiring &mut is a deficiency in the libraries you are using.) So, your choices are:

  • Use two database connections. (Presumably this defeats the point of using a transaction, so it is not an option.)
  • Retrieve all the data (either the original form or the mutated form) and store it locally (such as in a Vec) before executing the updates.
    let mut rows = sqlx::query!(r#"SELECT id, data FROM table"#)
        .fetch_all(&mut *tx as &mut PgConnection);

rows will be a Vec<R> for some row struct R, and will be eagerly pulled into memory, consuming the entire resultset in the process. That's effectively what you would be doing if you built up a structure of id-to-data anyways, so if your data fits handily in RAM and doesn't require an inordinate amount of time to fetch fully, its an easy option.

The underlying issue is that many database protocols do not support starting a second query while streaming the result set from the first query. Postgres' wire protocol does support this in a limited capacity (described here, in the section on query pipelineing), but you're not guaranteed to get the response to your UPDATEs until the results from the SELECT are fully consumed, which means that the algorithm you wrote might deadlock waiting for the database.

The conventional and most portable way to do this is to avoid interleaving queries like this. Fully consume the response to one query before starting the next. If the dataset is too large to processs in one pass, paginate your queries, either using LIMITs or by taking advantage of the structure of the data in some way.

3 Likes

Right, thank you both (& @kpreid). I suspected i might have to paginate, and collect in RAM.

Thanks for your second opinion.