Diesel Transaction & AsyncFn(Once)

Hello, I'm trying to have a callback from within and asynchronous Diesel Transaction that will do some work aborting the transaction if it (the callback) returns an error.

I have the following:

    pub async fn send_verification_email<F>(
        &self,
        email_id: i32,
        validation_id: i32,
        callback: F,
    ) -> Result<(), Error>
    where
        F: AsyncFnOnce() -> Result<(), String> + Send,
    {
        let now: jiff_diesel::Timestamp = jiff::Timestamp::now().into();
        let result = self
            .connection()
            .await?
            .transaction(|mut conn| {
                use schema::lzd::user_email;
                async move {
                    if let Err(err) = diesel::update(user_email::table)
                        .filter(user_email::id.eq(email_id))
                        .set((
                            user_email::valid.eq(false),
                            user_email::validation_id.eq(validation_id),
                            user_email::updated.eq(now),
                            user_email::updated_by_user.eq(user_email::user_id),
                        ))
                        .execute(&mut conn)
                        .await
                    {
                        return Err(err.into());
                    }
                    callback().await.map_err(Error::from)
                }
                .scope_boxed()
            })
            .await;
        result
    }

Unfortunately, this doesn't seem to work. The scope_boxed() call complains with

error: future cannot be sent between threads safely
   --> crates/lzd-db/src/lib.rs:235:18
    |
235 |                 .scope_boxed()
    |                  ^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `{async block@crates/lzd-db/src/lib.rs:219:17: 219:27}`, the trait `Send` is not implemented for `<F as AsyncFnOnce<()>>::CallOnceFuture`
note: future is not `Send` as it awaits another future which is not `Send`
   --> crates/lzd-db/src/lib.rs:233:21
    |
233 |                     callback().await.map_err(Error::from)
    |                     ^^^^^^^^^^ await occurs here on type `<F as AsyncFnOnce<()>>::CallOnceFuture`, which is not `Send`
note: required by a bound in `scope_boxed`
   --> /home/gbutler/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/scoped-futures-0.1.4/src/lib.rs:117:15
    |
115 |     fn scope_boxed<'upper_bound, 'subject>(self) -> ScopedBoxFuture<'upper_bound, 'subject, <Self as Future>::Output>
    |        ----------- required by a bound in this associated function
116 |     where
117 |         Self: Send + Future + 'subject;
    |               ^^^^ required by this bound in `ScopedFutureExt::scope_boxed`
help: consider further restricting the associated type
    |
211 |         F: AsyncFnOnce() -> Result<(), String> + Send, <F as AsyncFnOnce<()>>::CallOnceFuture: Send
    |                                                      ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

I'm not sure how to add the additional "Send" bound that it is requesting and I'm not even sure that this is even possible.

Is this possible?

I also tried the following after some experimentation:

    pub async fn send_verification_email<F>(
        &self,
        email_id: i32,
        validation_id: i32,
        callback: F,
    ) -> Result<(), Error>
    where
        F: AsyncFnOnce() -> Result<(), String> + Send,
        <F as AsyncFnOnce() -> Result<(), String>>::CallOnceFuture: Send,
    {
        let now: jiff_diesel::Timestamp = jiff::Timestamp::now().into();
        let result = self
            .connection()
            .await?
            .transaction(move |mut conn| {
                use schema::lzd::user_email;
                async move {
                    if let Err(err) = diesel::update(user_email::table)
                        .filter(user_email::id.eq(email_id))
                        .set((
                            user_email::valid.eq(false),
                            user_email::validation_id.eq(validation_id),
                            user_email::updated.eq(now),
                            user_email::updated_by_user.eq(user_email::user_id),
                        ))
                        .execute(&mut conn)
                        .await
                    {
                        return Err(err.into());
                    }
                    callback().await.map_err(Error::from)
                }
                .scope_boxed()
            })
            .await;
        result
    }

But this doesn't work either. It complains that that the <F as AsyncFnOnce() -> Result<(), String>>::CallOnceFuture: Send, is an unstable feature.

This is a major known limitation of the new AsyncFn* traits. There is currently no way to get this working, as that requires either stabilizing access to that associated type or a different syntax to specify that Send bound. As far as I know the language team is aware of that and actively working on resolving this in future rust releases.

For now the only solution to make such a pattern work it to use the same pattern diesel-async uses, which means using that ScopeBoxedFuture type as return type of your closure and relay on the old Fn* traits to express that bound. You likely need a bound similar to that one.

3 Likes