FnOnce + async lifetime struggles

Hello, everyone. I believe my question may be similar to others in this forum; however, I'm having trouble understanding the answers and transferring them to my problem. Also it seems this all went way over my head to be able to grasp the actual problem and find a way to solve it.

I have the following compiling and working code. I have an async tauri command, which is supposed to call an async service function. This function should be called within a db transaction. Therefore I need to juggle between async and a thread, as diesel does not support async transactions.

#[tauri::command]
pub async fn some_tauri_command(data: &str, state: tauri::State<'_, AppState>) -> Result<(), String> {
    let data = data.to_owned();
    let mut c: PooledConnection<ConnectionManager<SqliteConnection>> = state.pool.get().unwrap();

    let result_in_result: Result<Result<(), TxError>, tauri::Error> = tauri::async_runtime::spawn_blocking(move || {
        let handle = tauri::async_runtime::TokioHandle::current();
        let inner_result: Result<(), TxError> = c.transaction(|conn| {
            handle.block_on(async {
                SomeService::do_async(&data, conn)
                    .await
                    .map_err(|err| TxError::Other(err))
            })
        });
        inner_result
    }).await;

    let flattened_result: Result<(), TxError> = result_in_result
        .map_err(|err| TxError::Other(Box::new(err)))
        .and_then(|res| res);

    flattened_result.map_err(|err| err.to_string())
}

struct SomeService {}

impl SomeService {
    pub async fn do_async(data: &String, conn: &mut SqliteConnection) -> Result<(), Box<dyn Error>> {
        // do something with io (therefore async preferable), the db connection and data
        Ok(())
    }
}

and the error class which I need for diesel to be satisfied:

pub enum TxError {
    Other(Box<dyn Error>),
    DieselError(diesel::result::Error),
}

impl From<diesel::result::Error> for TxError {
    fn from(value: diesel::result::Error) -> Self {
        TxError::DieselError(value)
    }
}

impl Debug for TxError {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        match self {
            TxError::DieselError(err) => std::fmt::Debug::fmt(&err, f),
            TxError::Other(err) => std::fmt::Debug::fmt(&err, f),
        }
    }
}

impl Display for TxError {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        match self {
            TxError::DieselError(err) => std::fmt::Display::fmt(&err, f),
            TxError::Other(err) => std::fmt::Display::fmt(&err, f),
        }
    }
}

impl Error for TxError {}

unsafe impl Send for TxError {}

As I need to do this in multiple commands, I was hoping to extract the thread-juggling to a utility function do_transactional. Unfortunately, I cannot get this to work. Here is what I am trying:

#[tauri::command]
pub async fn some_tauri_command(data: &str, state: tauri::State<'_, AppState>) -> Result<(), String> {
    let data = data.to_owned();
    let pool: &Pool<ConnectionManager<SqliteConnection>> = &state.pool;

    let result: Result<(), TxError> = do_transactional(pool, move |conn| {
        SomeService::do_async(data, conn)
    }).await;
    result.map_err(|err| err.to_string())
}

struct SomeService {}

impl SomeService {
    pub async fn do_async(data: String, conn: &mut SqliteConnection) -> Result<(), Box<dyn Error>> {
        // do something with io (therefore async preferable), the db connection and data
        Ok(())
    }
}

pub async fn do_transactional<T, F, Fut>(pool: &Pool<ConnectionManager<SqliteConnection>>, f: F) -> Result<T, TxError>
    where
        T: Send + 'static,
        F: (FnOnce(&mut SqliteConnection) -> Fut) + Send + 'static,
        Fut: Future<Output=Result<T, Box<dyn Error>>>,
{
    let mut c = pool.get().map_err(|err| TxError::Other(Box::new(err)))?;

    let thread_result: Result<Result<T, TxError>, tauri::Error> = tauri::async_runtime::spawn_blocking(move || {
        let handle = tauri::async_runtime::TokioHandle::current();
        let transaction_result: Result<T, TxError> = c.transaction(|conn| {
            let result: Result<T, Box<dyn Error>> = handle.block_on(async {
                f(conn).await
            });
            result.map_err(|err| TxError::Other(err))
        });
        transaction_result
    }).await;

    thread_result
        .map_err(|err| TxError::Other(Box::new(err)))
        .and_then(|res| res)
}

which gives following error:

error: lifetime may not live long enough
  --> src/commands/commands_commands.rs:40:9
   |
38 |     let result = do_transactional(pool, move |conn| {
   |                                               ----- return type of closure `impl futures::Future<Output = Result<(), Box<(dyn StdError + 'static)>>>` contains a lifetime `'2`
   |                                               |
   |                                               has type `&'1 mut SqliteConnection`
39 |         let data = data.clone();
40 |         SomeService::do_async(data, conn)
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ returning this value requires that `'1` must outlive `'2`

Apparently Rust wants conn to live at least as long as the result from FnOnce. Why hasn't this been a problem in the working example from above, I don't understand what has changed in regards to the relation of their lifetimes. And obviously I would like to know how I could fix the issue.

Futures don't run until awaited. When you call an async function like do_async, it doesn't do anything except putting all of its arguments in a struct.

This code:

    do_transactional(pool, move |conn| {
        SomeService::do_async(data, conn)
    })

behaves like:

    do_transactional(pool, move |conn| {
        Future { data, conn }
    })

So every Future of async fn is always holding all of its arguments, and therefore lifetime of that Future depends on lifetimes of all of its arguments.

In generic Rust code nothing is allowed, unless you explicitly make so. In your case the definition of the callback does not say that the callback is allowed to use conn beyond the synchronous call.

Unfortunately, lifetime annotations needed here are awkward.

You need for<'a> syntax to declare a new lifetime for the callback function. If you put the lifetime on the trait or some outer scope, it will mean that every connection has to outlive the whole SomeService and no connections can be created after it.

for<'conn> FnOnce(&'conn mut SqliteConnection) 

but the PITA is that this for lifetime syntactically exists only on this one line, and you need to make Fut have it. This isn't valid: for<'conn> FnOnce(&'conn mut SqliteConnection) -> Fut + 'conn

This is valid, but requires Box::pin(async {}) rather than async fn:

F: for<'conn> (FnOnce(&'conn mut SqliteConnection) -> BoxFuture<'conn, Result<T, Box<dyn Error>>>

I vaguely remember this could be worked around with an extra helper struct or trait (edit: here it is).

5 Likes

As an (important) side note:

That's unsoundness (potential UB) right there. Your TxError cannot be Send, because that's not enforced in the Other variant. Please absolutely DO NOT use unsafe if you don't know what you are doing. It's perfectly possible to make your TxError: Send in 100% safe code by putting Box<dyn Error + Send> into the Other variant, and likewise changing all other explicit error type annotations to Box<Error + Send>.

You can also remove the impl Debug + Display + Error boilerplate using thiserror.

3 Likes

A helper trait can convert the extra Fut type into an associated type and allow it to be in scope of the for: https://docs.rs/async_fn_traits/latest/async_fn_traits/

2 Likes

Thanks, great advice!

@kornel @kpreid

Thank you very much for your replies!

I tried adjusting the function according to your suggestions, but unfortunately I still cannot get it to work.

I changed it to:

 pub async fn do_transactional<T, F>(&self, f: F) -> Result<T, TxError>
        where
            T: Send + 'static,
            F: for<'conn> AsyncFn1<&'conn mut SqliteConnection, Output=Result<T, Box<dyn Error + Send>>> + Send + 'static,
...

However, now I am getting two error messages:

The first one is still the same

error: lifetime may not live long enough
  --> src/commands/import_account_link_commands.rs:36:9
35 |     let x = state.do_transactional(move |conn| {
   |                                    -----------
   |                                    |         |
   |                                    |         return type of closure `impl futures::Future<Output = Result<(), Box<(dyn StdError + 'static)>>>` contains a lifetime `'2`
   |                                    lifetime `'1` represents this closure's body
36 |         SomeService::do_async(data, conn)
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ returning this value requires that `'1` must outlive `'2`
   |
   = note: closure implements `Fn`, so references to captured variables can't escape the closure

and a new one

error: implementation of `FnOnce` is not general enough
  --> src/commands/import_account_link_commands.rs:35:13
   |
35 |       let x = state.do_transactional(move |conn| {
   |  _____________^
36 | |         SomeService::do_async(data, conn)
37 | |     });
   | |______^ implementation of `FnOnce` is not general enough
   |
   = note: closure with signature `fn(&'2 mut SqliteConnection) -> impl futures::Future<Output = Result<(), Box<(dyn StdError + 'static)>>>` must implement `FnOnce<(&'1 mut SqliteConnection,)>`, for any lifetime `'1`...
   = note: ...but it actually implements `FnOnce<(&'2 mut SqliteConnection,)>`, for some specific lifetime `'2`

Also I don't understand why it would help creating the new 'conn lifetime? How does Rust know for how long this lifetime has to be valid? What is actually happening when defining this lifetime? Why hasn't the connections lifetime been an issue before, when I did not try to generalize it to a do_transactional function?

You've quoted everything but the actual error message. Read (and share) the first line, that starts with error:. All the other lines are further details and advice but they must be understood in context of the primary message.

Oh, sorry. That was very smart. Edited the previous message

Before all the code involved was inside a single function, this allowed the compiler to know all the lifetimes involved. Now you want to separate the code into a separate function, and this introduces a boundary that the compiler can't see through. Thus you need to annotate this boundary so that it can check that both sides are correct in isolation; this are the trait annotations on do_transational.

Now, in your trait bounds you have specify the lifetime of the reference taken by the closure. You can't add a lifetime parameter to your function because the caller gets to choose it, but the actual lifetime is determined inside do_transactional, not by the caller. What you need is a way to say that the closure passed to do_transactional need to be valid for some lifetime unknown to the caller. This is however the same as saying that it needs to be valid for any lifetime, and that's what for<'conn> expresses.


Now, why does it still not work? That's because the compiler still has to determine whether the closure you pass is valid for some specific lifetime, or for all lifetimes (that's the same decision you just had to make!). There are unfortunately no fixed and reliable rules the compiler can use for this, so it's entirely up to some heuristic. AFAIK the main heuristic is "is this closure being passed as a parameter that's bounded by for<'a> Fn*(...) -> ...? if so it should be valid for any lifetime". Unfortunately in your case the bound is for<'a> AsyncFn1<...>, which is not Fn*(...) -> ..., even though it requires it as a supertrait! This is a known issue and AFAIK there's no ergonomic solution https://github.com/rust-lang/rust/issues/70263
Your best solution is to follow @kornel's advice to use BoxFuture

F: for<'conn> (FnOnce(&'conn mut SqliteConnection) -> BoxFuture<'conn, Result<T, Box<dyn Error>>>

As you can see now you do have a bound on the form for<'a> Fn*(...) -> ..., so the compiler should be able to infer that the closure should be valid for any lifetime 'a. If you're wondering why you couldn't have this before, that's because the future returned by your closure needs to depend on the lifetime 'conn, so it needs to be defined when 'conn already syntactically exist, but there's no place to do so inside a trait bound. So the solution to that is to "hide" this definition in another trait, but that's what break inference for the closure.

3 Likes

Thank you all for putting so much effort into solving this problem and providing so much explanations. The good news is that is finally compiling as long as I clone the data parameter, so it is owned, and ownership is transferred to the closure.

Unfortunately I still cannot fully wrap my head around what is going on here. I re-read the nomicon read lots of posts regarding the topic, but it still doesn't click. I mainly struggle with two things:

Why is the for <'conn> lifetime improving anything, and why do I need BoxFuture?

My theory is following:
The 'conn lifetime is expressing that the BoxFuture is at least valid for as long as conn is valid. Therefore, as long as I call await on the BoxFuture before conn is dropped, which takes ownership of the inner result and allows the BoxFuture to be dropped afterwards, everything is fine (which is whats happening)
The BoxFuture is necessary because I cannot specify a lifetime if I would just use Future.
Because I have a closure capturing data, I have two input parameters (data and conn). Therefore I have two input lifetimes, so lifetime elision does not kick in. Therefore the output lifetime is not automatically assigned by Rust and I need to specify it, which is why I need BoxFuture.

I am afraid this is complete nonsense?
Also, as a side question – am I doing something very exotic, or am I simply very inexperienced in Rust. I don't have the feeling I am trying something too crazy here but right now I cannot imagine being able to solve a problem like that ever by myself.

My next problem is that the code only works as long as the parameters are owned by the function which calls the do_transactional function and move ownership. However, it would be desirable to pass refs to the async function, which was possible before extracting the do_transactional method.

The code and error is following:

struct SomeService {}

impl SomeService {
    pub async fn do_async(data: &str, conn: &mut SqliteConnection) -> Result<(), Box<dyn Error>> {
        // data would actually be something which cannot trivially be owned, e.g. I am calling a stateful singleton service, which would need to be passed as ref
        Ok(())
    }
}

#[tauri::command]
pub async fn some_command(data: &str, state: tauri::State<'_, AppState>) -> Result<(), String> {
    let pool: &Pool<ConnectionManager<SqliteConnection>> = &state.container.pool;

    let x = do_transactional(pool, move |conn| {
        Box::pin(SomeService::do_async(data, conn))
    });
    x.await.map_err(|err| err.to_string())
}

pub async fn do_transactional<T, F>(pool: &Pool<ConnectionManager<SqliteConnection>>, f: F) -> Result<T, TxError>
    where
        T: Send + 'static,
        F: for<'conn> FnOnce(&'conn mut SqliteConnection) -> BoxFuture<'conn, Result<T, Box<dyn Error>>> + Send + 'static,
{
    let mut pool = pool.get().map_err(|err| TxError::Other(Box::new(err)))?;

    let thread_result: Result<Result<T, TxError>, tauri::Error> = tauri::async_runtime::spawn_blocking(move || {
        let handle = tauri::async_runtime::TokioHandle::current();
        let transaction_result: Result<T, TxError> = pool.transaction(|conn| {
            let result: Result<T, Box<dyn Error>> = handle.block_on(async {
                f(conn).await
            });
            result.map_err(|err| TxError::Other(err))
        });
        transaction_result
    }).await;

    thread_result
        .map_err(|err| TxError::Other(Box::new(err)))
        .and_then(|res| res)
}

error: lifetime may not live long enough
  --> src/commands/tauri_commands.rs:24:9
   |
20 | pub async fn some_command(data: &str, state: tauri::State<'_, AppState>) -> Result<(), String> {
   |                                 - let's call the lifetime of this reference `'1`
...
24 |         Box::pin(SomeService::do_async(data, conn))
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ returning this value requires that `'1` must outlive `'static`

Why is this even a problem? Doesn't the lifetime of &data live for the whole some_command function call?

The 'conn parameter of BoxFuture is not just saying that the BoxFuture is at least valid for as long as conn is valid (you would get even more freedom than this if you use 'static for example, because that would mean it could be valid even until the end of the program). The important detail instead is that it allows the BoxFuture to borrow from the connection. If the BoxFuture wasn't expressed with the 'conn lifetime then it would be required to be valid even after the connection is dropped, which is not the case if your future borrows from it (which is actually what happens).

data doesn't really matter here, you can get the same error even without it. This is not a matter of lifetime elision because that only kicks in when you don't specify the lifetimes in a function signature. With closures it's actually inference of not just lifetimes but also their quantifier (the "is this valid for a specific lifetime only or for every possible lifetime?")

I would say that what you're trying to do is perfectly reasonable, it's just that you've found one of the rough edges of async in Rust and in particular async closures, caused by lack of expressiveness in the current type system and inference bugs in the compiler.

In your original snippet of code you were calling .to_owned() on data before starting the transaction, so you weren't passing refs to the async function, you should be able to do the same now (you could also probably just take a String as function parameter instead of a reference)

The reason you can't send references is due to the use of tauri::async_runtime::spawn_blocking, which require the closure passed to be 'static. This hasn't changed from before, and probably you won't be able to remove this limitation (it's unfortunately another rough edge of async, prefer owned data when possible with async).

2 Likes

I guess I am now able to understand what's going on here for the most part. Also for now it seems to be possible to use the do_transactional fn in my project. For all current cases I was able to provide owned parameters.

It's also quite interesting to learn that there still appear to be edge cases in Rust that you can't express, even though it should be possible, due to limitations in the compiler. I am not sure I've encountered this in any other programming language before.

Thanks to all for taking the time to provide such thorough answers! I wish I could pay for some coffee. Please let me know if you ever make it to Munich.

Hmm, I've seen very rough edges or other serious limitations in all the languages I've learned. But in this specific case it is important to be aware of the current state of async Rust:
https://rust-lang.github.io/async-book/01_getting_started/03_state_of_async_rust.html

1 Like

It looks like this thread might be resolved, but I have some thoughts.

I would personally first try using an async block instead of a closure, here. But because you are trying to pass some context back to the closure (conn) it might be better to block_on and await the future within the closure? I have not tested it, but something like this:

    let result: Result<(), TxError> = do_transactional(pool, move |conn| {
        let handle = tauri::async_runtime::TokioHandle::current();
        handle.block_on(async {
            SomeService::do_async(data, conn).await
        })
    }).await;

The goal is to consume the conn reference within the closure, instead of passing it back to the caller indirectly through a Future. (No idea if this works, but it sounds pretty good as I type...)

Now you have two nested blocking calls, but that might be the price of admission for trying to callback your way out of Deisel's lack of async transactions...


An alternative proposal

It might be a good time for an interlude to consider putting Deisel transactions in their own thread pool and sending transaction requests through an async channel (the request needs to be Send, but this is basically free if you create your own enum message type) and awaiting the response. I usually send a one-shot Sender through the channel so the thread can trivially send the response back. (Bonus: Your enum messages can trivially own a Sender. It's so nice when things just work without surprises!)

It's a bit harder to describe the control flow with message passing, but the code is (IMHO) much cleaner and gives you far more concurrency than you will ever get out of a blocking thread pool [1]. And also lifetime burdens are basically no longer a thing with this architecture. (Unless you want to send loans for some reason, ugh, good luck with that!)

Here's a sketch of what this could look like for you (based on a project I am maintaining):

/// These are all the transaction requests you send to the DB...
enum Message {
    SomeService {
        tx: oneshot::Sender<Result<SomeServiceResponse, TransactionError>>,
        data: String, // This is a placeholder for other arguments that the transaction needs...
    },

    // And others...
    Foo { tx: oneshot::Sender<Result<FooResponse, TransactionError>> },
}

/// This is one of the transaction responses you get from the DB...
struct SomeServiceResponse {
    data: String, // Or whatever! Another placeholder, you get the idea.
}

/// This is another transaction response, but you have to use your imagination.
struct FooResponse;

/// This is one of the worker threads for your thread pool.
/// It receives the connection from the pool, so that callers don't have to worry about it.
fn db_thread(c: Connection, mut rx: mpsc::Receiver<Message>) {
    while let Some(msg) = rx.blocking_recv() {
        Message::SomeService { tx, data } => {
            let response: SomeServiceResponse = c.transaction(|conn| {
                // Notice this is now a sync call...
                // And we have all the data needed to perform the transaction.
                SomeService::do_sync(&data, conn)
            });

            if tx.send(response).is_err() {
                // There isn't much you can do in this case, log it and move on...
                error!("Unable to send SomeServiceResponse: rx closed?!");
            }
        },

        // And handle all other messages the same way...
        Message::Foo{ tx } => {
            if tx.send(FooResponse).is_err() {
                error!("Unable to send Foo: rx closed?!");
            }
        },
    }
}

/// When you want to send the transaction and receive its response...
async fn some_tauri_command(data: String, state: tauri::State<'_, AppState>) -> anyhow::Result<()> {
    let (tx, rx) = oneshot::channel();

    // Send the transaction request.
    state.tx.send(Message { tx, data }).await?;

    // Await the transaction response.
    let response: SomeServiceResponse = rx.await?;

    // Send the transaction response to the service handler for further processing.
    SomeService::do_async(&response.data).await?;

    Ok(())
}

And, yeah, that's it. It has a bit of ceremony (see below) but the core of it is four lines (!) in your async some_tauri_command() function, a tiny message consumer loop in the db_thread function, and splitting SomeService into two functions: one is sync (sends the transaction and returns the raw response and the other is async (takes a raw response and processes it with any async tasks as necessary). Note that the second half of SomeService can optionally be sync if it doesn't need to await any async functions.

Also, String is really cheap to send between threads. It's only 24 bytes on x86_64! Don't be afraid of it for message passing [2]. And oneshot::Sender is even better. Only 8 bytes!

The one missing piece of info is how you get the long-lived channel connected between the transaction thread pool and the Tauri command thing. And that's up to you! But it probably happens at init time. The channel is just a tokio::sync::mpsc::channel() with an adequate size. The receiver is sent to the thread pool (note that this channel is SC - Single-Consumer - so it needs one channel per thread in the thread pool). The sender is placed into your application state.

And then for handling multiple threads in the transaction thread pool you need to consider either trying an async mpmc channel instead of mpsc, or making the callers use some scheduling policy (like round robin) when selecting a channel to send the request on.

Also, don't forget bb8 and deadpool! You don't necessarily need to write your own thread pool and scheduling policies. Most of these concerns can be delegated to either of these crates. Note that deadpool likes to do callback-y stuff for blocking backends, which will put you right back into all the fun you started with. But deadpool-deisel is definitely a thing.


  1. To qualify this statement, the Tokio blocking thread pool is a limited resource. It is shared with all blocking tasks, not just those related to transactions. Its size is configurable, but you will always have this contention with other blocking tasks regardless. â†Šī¸Ž

  2. If 24 bytes is a real concern, consider Arc<str>. It's 16 bytes... â†Šī¸Ž