Futures and Closures, capturing variables

Hello everybody – i whish you all a nice Sunday :slight_smile:

Last week i mostly solved my problem with your help with how to read compiler error messages the right way when dealing with Futures. The thing i want to do – having an actix web server and inserting data with diesel while having a transaction with isolation level serializable and retry it if it fails with 40001(SerializationFailure) – is mostly working.

Now i want the code to be nice and learn something on the way. My main Problem – at least its how it appears to me – is that i now have a lot of closures paired with futures and i need my input data flow "down the stream" of that closures. I am stuck at a place that feels a little bit ugly and i want to get rid of it because of two reasons, first i think it is somehow superfluous and secondly it prevents me from making a nice abstraction. I have distilled the problem down to this playground example. The main attraction here is the register function

fn register(new_user: NewUser) -> impl Future<Item = usize, Error = usize> {
    result(Ok::<usize, usize>(1)) //just simulating input validation
        //.from_err()
        .and_then(|_| {
            retry(
                move || {
                    //how to avoid having this clone, or "parking" new_user
                    //here altogether? This is also the place where i have to
                    //"park" my diesel connection pool in the "real" code
                    let new_user = new_user.clone();
                    web_block(move || diesel_insert(&new_user))
                },
                handle_serialize_error,
            )
        })
}

retry here needs a FnMut because we want to call the closure multiple times eventually. web_block wants to have a FnOnce, similar to and_then. retryand web_block are crafted after the methods in Futures-retry and Actix-web respectively. I put the "real implementation" at the end of this post, but this playground example should behave just the same.

So because the Futures can run long after the register function has returned, the new_user variable does not live long enough inside the register function for the diesel_insert function to have a valid/living new_user borrow. So i need to move it first inside the retry closure and "park" it here because at this stage i want to use it multiple times and then move it into the web_block closure. As i said i find this somehow superfluous and it prevents me from having an abstraction like this

//I want a nice abstraction that somewhat looks like this

fn register_nice(new_user: NewUser) -> impl Future<Item = usize, Error = usize> {
    result(Ok::<usize, usize>(1)) //just simulating input validation
        //.from_err()
        .and_then(|_| {
            retry_db(|| diesel_insert(&new_user))
        })
}

fn retry_db<FN>(fun: FN) -> impl Future<Item = usize, Error = usize>
where
    FN: FnOnce() -> Result<usize, usize>,
{
    retry(|| web_block(|| fun()), handle_serialize_error)
}

Because with this i don't see a way to "park" those variables. Mind you, i could craft a specific retry_db function for every pair of parameters such "register_nice" function could have but ultimately i want to have a variable amount to be "parked" in a generic way so i only have to code this function once, otherwise this exercise would be pretty useless. In "real life" there is also the database connection to be parked there. i haven't really found a nice way around this, i think i understand why (the variables needs to live at the point where we want to use the variables multiple times, because the FnOnce - web_block consumes it every time)

As promised here is the "original Code". Just for reference but i think all reasoning can be done with the code above and the playground example

pub fn register(
    register_user: Json<WsRegisterUser>,
    db_pool: Data<DbPool>,
) -> impl Future<Item = HttpResponse, Error = ApiError> {
    result(register_user.validate())
        //.map_err(|_| { ApiError::BadRequest(WsResponseCode::BadRequest, "email or password invalid".into()) } )
        .from_err()
        .and_then(move |_| {
            FutureRetry::new(
                move || {
                    //we need to hold onto these variables here in this closure, because we want
                    //to use these values multiple times inside the FnOnce web::block(..)
                    let new_user: DbNewUser = register_user.clone().into(); //impl from with hash
                    let db_pool = db_pool.clone();

                    actix_web::web::block(move || {
                        let conn: DbConn = db_pool.get().unwrap(); //TODO(dustin) remove unwrap
                        conn.build_transaction()
                            .serializable()
                            .run(|| diesel::insert_into(users).values(&new_user).execute(&conn))
                    })
                },
                handle_serialize_error,
            )
            .from_err()
            //.map_err(|_| { ApiError::BadRequest(WsResponseCode::BadRequest, "serialize error".into()) } )
            .and_then(|x: usize| {
                Ok(WsResponse::<()>::wrap_ok_none()) //TODO(dustin): find a way to remove type annotation `::<()>`
            })
        })
}

What about something like this?

fn register(new_user: NewUser) -> impl Future<Item = usize, Error = usize> {
    result(Ok::<usize, usize>(1)) //just simulating input validation
        //.from_err()
        .and_then(move |_| {
            let new_user = new_user; // move it into this closure
            retry(
                || {
                    web_block(|| diesel_insert(&new_user))
                },
                handle_serialize_error,
            )
        })
}

Hi @alice, thanks for your reply! You can try this in the fully functional playground example i have provided (Sorry i see that i have provided the wrong link, this one should compile). And to answer your question: unfortunately the compiler is yelling at me :slight_smile:

EDIT: i have corrected the links in the original post, sorry for that.

With your suggestion i get:

error[E0373]: closure may outlive the current function, but it borrows `new_user`, which is owned by the current function
  --> src/main.rs:46:17
   |
46 |                 || {
   |                 ^^ may outlive borrowed value `new_user`
47 |                     web_block(|| diesel_insert(&new_user))
   |                                                 -------- `new_user` is borrowed here
   |
note: closure is returned here
  --> src/main.rs:45:13
   |
45 | /             retry(
46 | |                 || {
47 | |                     web_block(|| diesel_insert(&new_user))
48 | |                 },
49 | |                 handle_serialize_error,
50 | |             )
   | |_____________^
help: to force the closure to take ownership of `new_user` (and any other referenced variables), use the `move` keyword
   |
46 |                 move || {
   |                 ^^^^^^^

my current approach is with a Macro:

#[macro_export]
macro_rules! db_retry {
    (<$($vars:ident),*>, $closure:tt) => {
        retry(move || {
            $(let $vars = $vars.clone();)*
            web_block(move || $closure($($vars,)*))
        }, handle_serialize_error)
    };
}

so i can write something like this:

fn register_nicer(new_user: NewUser) -> impl Future<Item = usize, Error = usize> {
    result(Ok::<usize, usize>(1)) //just simulating input validation
        //.from_err()
        .and_then(|_| {
            db_retry!(<new_user>, (|nu| diesel_insert(&nu)) )
        })
}

but i don't know if i am super happy with it. In my "real" Code it looks something like this

#[macro_export]
macro_rules! db_retry {
    (<$($vars:ident),*>,<$db:ident>, $closure:tt) => {
        FutureRetry::new(move || {
            $(let $vars = $vars.clone();)*
            let $db = $db.clone();
            actix_web::web::block(move || {
                serial_conn($db, |conn: &DbConn| {
                   $closure( $($vars,)* conn)
                })
            })
        }, handle_serialize_error)
    };
}

pub fn register(
    register_user: Json<WsRegisterUser>,
    db_pool: Data<DbPool>,
) -> impl Future<Item = HttpResponse, Error = ApiError> {
    result(register_user.validate())
        //.map_err(|_| { ApiError::BadRequest(WsResponseCode::BadRequest, "email or password invalid".into()) } )
        .from_err()
        .and_then(move |_| {
            let dbn: DbNewUser = register_user.into_inner().into();

            db_retry!(
                <dbn>,<db_pool>,
                (|new_user: DbNewUser, conn: &DbConn| {
                    diesel::insert_into(users).values(&new_user).execute(conn)
                })
            )
            .from_err()
            //.map_err(|_| { ApiError::BadRequest(WsResponseCode::BadRequest, "serialize error".into()) } )
            .and_then(|x: usize| {
                Ok(WsResponse::<()>::wrap_ok_none()) //TODO(dustin): find a way to remove type annotation `::<()>`
            })
        })
}

Don't know if this is really an improvement. The Macro looks somewhat cryptic – but that has to do with my lack of Macros Wizardry i guess (unnecessary <..>) and it just overall looks a little bit cryptic, because i had to handle the DB variable explicitly so i can use the serial_conn function that is just a little wrapper over

pub fn serial_conn<T, E, F>(db_pool: Data<DbPool>, f: F) -> Result<T, E>
where
    F: FnOnce(&DbConn) -> Result<T, E>,
    E: From<diesel::result::Error>,
{
    let conn: DbConn = db_pool.get().unwrap(); //TODO(dustin) remove unwrap
    conn.build_transaction().serializable().run(|| f(&conn))
}

just playing around to make the original code a little bit more convenient. I think i just can't get around the original problem of "parking" the values inside the FnMut Closure.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.