[solved] Lost in type puzzle with futures and Fn traits

I am trying to build a webservice with actix-web + diesel. Everything works out so far but i have a problem. To avoid the XY-Problem here is my use case i am trying to solve. I have a database operation i want to do that needs a transaction isolation level serializable. There could be a transaction Y running that would fail the current transaction X. All i want to do is to wait a little bit and retry X and hope Y is finished when X is applied again. I tried to boil it down as much as i can and preserve the original compiler error. I think one can even boil it down much further but i just don't know exactly what the problem is, so here we are. I use the futures_retry crate to accomplish the retry functionality. here is the example

cargo.toml:

[package]
name = "fn_future_test"
version = "0.1.0"
edition = "2018"

[dependencies]
actix-web = "1.0.0"
futures = "0.1.27"
futures-retry = "0.3.3"
diesel = { version = "1.0.0", features = ["postgres", "r2d2"] }

main.rs:

use actix_web::error::ErrorBadRequest;
use actix_web::{web, App, Error, HttpResponse, HttpServer};
use futures::future::result;
use futures::Future;
use futures_retry::{FutureRetry, RetryPolicy, RetryPolicy::WaitRetry};
use std::time::Duration;

fn handle_serialize_error(e: diesel::result::Error) -> RetryPolicy<diesel::result::Error> {
    //retry only if we have a SerializationError, everything else should fail immediately
    match e {
        diesel::result::Error::DatabaseError(
            diesel::result::DatabaseErrorKind::SerializationFailure,
            msg,
        ) => WaitRetry(Duration::from_millis(500)),
        _ => RetryPolicy::ForwardError(e),
    }
}

//would have input parameters like a Json<NewUser> that gets validated and a database connection
fn handler() -> impl Future<Item = HttpResponse, Error = Error> {
    result(Ok(1)) //just simulating input validation
        //.map_err(|err| ErrorBadRequest(err)) //in case the validation went wrong
        .and_then(|val| {
            let db_retry = FutureRetry::new(
                || {
                    actix_web::web::block(|| {
                        //make an insert with diesel that could yield a SerializationFailure inside a
                        //transaction something like:
                        //
                        //let conn = &db_pool.get().unwrap();
                        //conn.build_transaction()
                        //    .serializable()
                        //    .run(|| {
                        //        diesel::insert_into(users)
                        //            .values(&new_user)
                        //            .execute(conn)
                        //    })
                        //but just simulate an error here
                        Err(diesel::result::Error::DatabaseError(
                            diesel::result::DatabaseErrorKind::SerializationFailure,
                            Box::new("test error".to_string()),
                        ))
                    })
                },
                handle_serialize_error,
            );

            db_retry
                .map_err(|_err| ErrorBadRequest("error".into()))
                .and_then(|x: usize| Ok(HttpResponse::Ok().body("every thing is fine")))
        })
}

fn main() {
    println!("starting server");

    HttpServer::new(|| App::new().route("/", web::get().to_async(handler)))
        .bind("127.0.0.1:8000")
        .expect("Can not bind to port 8000")
        .run()
        .unwrap();
}

The output from the compiler i am getting is:

    Checking fn_future_test v0.1.0 (/home/naikon/Documents/dev/test/rust/fn_future_test)
error[E0599]: no method named `map_err` found for type `futures_retry::future::FutureRetry<[closure@src/main.rs:25:17: 43:18], fn(diesel::result::Error) -> futures_retry::RetryPolicy<diesel::result::Error> {handle_serialize_error}>` in the current scope
  --> src/main.rs:48:18
   |
48 |                 .map_err(|_err| ErrorBadRequest("error".into()))
   |                  ^^^^^^^
   |
   = note: the method `map_err` exists but the following trait bounds were not satisfied:
           `&mut futures_retry::future::FutureRetry<[closure@src/main.rs:25:17: 43:18], fn(diesel::result::Error) -> futures_retry::RetryPolicy<diesel::result::Error> {handle_serialize_error}> : futures::future::Future`
           `futures_retry::future::FutureRetry<[closure@src/main.rs:25:17: 43:18], fn(diesel::result::Error) -> futures_retry::RetryPolicy<diesel::result::Error> {handle_serialize_error}> : futures::future::Future`

error: aborting due to previous error

For more information about this error, try `rustc --explain E0599`.
error: Could not compile `fn_future_test`.

To learn more, run the command again with --verbose.

My guess is, that is has to do with the Fn traits of the closures given by FutureRetry::new and the corresponding FutureFactory that takes an FnMut and actix_web::web::block that takes an FnOnce. I was thinking that the factory would just create a new FnOnce every time it's invoked again, but it "leaks" somehow? I just don't know. I can't really figure out where this &mut futures_retry::future::FutureRetry<... is coming from the compiler is complaining about :confused:
I hope someone can give me a hand. I think i could try another retry crate that is not based on Futures, because the actual database operation is sync anyway – as diesel is currently structured, therefore the actix_web::web::block – but regardless of any solution i just want to know what the root cause is in this specific example, to learn something. And maybe someone could further boil this down to a very simple example, because this is confusing on its own because of the many things involved like diesel, actix-web, futures that have all very verbose types that make it hard to reason about and follow the types in the docs. Thanks to everybody that has come this far down of that wall of text :slight_smile:

Mhh.. i think it has nothing really to do with the Fn traits in the actix_web::web::block or the FutureFactory taking different Fn traits. I boiled it down to a version where this works:

...
fn handle_serialize_error(e: String) -> RetryPolicy<String> {
    RetryPolicy::ForwardError(e)
}

fn handler() -> impl Future<Item = HttpResponse, Error = Error> {
    let db_retry = FutureRetry::new(
        //|| result(Err::<usize, String>("bad".into())),
        || result(Ok::<usize, String>(1)),
        handle_serialize_error,
    );

    db_retry
        .map_err(|_err| actix_web::error::ErrorBadRequest("bad request"))
        .and_then(|_x: usize| Ok(HttpResponse::Ok().body("every thing is fine")))
}
...

but this does not:

...
fn handle_serialize_error(e: diesel::result::Error) -> RetryPolicy<diesel::result::Error> {
    RetryPolicy::ForwardError(e)
}

fn handler() -> impl Future<Item = HttpResponse, Error = Error> {
    let db_retry = FutureRetry::new(
        || result(Ok::<usize, diesel::result::DatabaseErrorKind>(1)),
        handle_serialize_error,
    );

    db_retry
        .map_err(|_err| actix_web::error::ErrorBadRequest("bad request"))
        .and_then(|_x: usize| Ok(HttpResponse::Ok().body("every thing is fine")))
}
...

and having the same error message as before.

Fortunately it has nothing to do with Fn traits. I was just using the wrong type in the handle_serialize_error function. I think i have to learn to better chase the types when using futures – the compiler messages are just very different for this kind of errors where you expect a type as function parameter but gives it the wrong one. Just for completeness sake here is a running example that emulates a serialization error for 80 percent of the times and succeeds eventuality.

I think i just needed to write this down somewhere to solve it myself :slight_smile:

use actix_web::error::ErrorBadRequest;
use actix_web::{web, App, Error, HttpResponse, HttpServer};
use futures::future::result;
use futures::Future;
use futures_retry::{FutureRetry, RetryPolicy, RetryPolicy::WaitRetry};
use rand::Rng;
use std::time::Duration;

fn handle_serialize_error(
    e: actix_web::error::BlockingError<diesel::result::Error>,
) -> RetryPolicy<actix_web::error::BlockingError<diesel::result::Error>> {
    println!("check error");
    match &e {
        actix_web::error::BlockingError::Error(diesel_err) => match diesel_err {
            diesel::result::Error::DatabaseError(
                diesel::result::DatabaseErrorKind::SerializationFailure,
                msg,
            ) => WaitRetry(Duration::from_millis(500)),
            _ => RetryPolicy::ForwardError(e),
        },
        Canceled => RetryPolicy::ForwardError(e),
    }
}

//would have input parameters like a Json<NewUser> that gets validated and a database connection
fn handler1() -> impl Future<Item = HttpResponse, Error = Error> {
    result(Ok(1)) //just simulating input validation
        //.map_err(|err| ErrorBadRequest(err)) //in case the validation went wrong
        .and_then(|_val| {
            let db_retry = FutureRetry::new(
                || {
                    actix_web::web::block(|| {
                        //make an insert with diesel that could yield a SerializationFailure inside a
                        //transaction something like:
                        //
                        //let conn = &db_pool.get().unwrap();
                        //conn.build_transaction()
                        //    .serializable()
                        //    .run(|| {
                        //        diesel::insert_into(users)
                        //            .values(&new_user)
                        //            .execute(conn)
                        //    })

                        //let this fail 80 percent of the time
                        let mut rng = rand::thread_rng();
                        if dbg!(rng.gen_range(0, 10)) >= 8 {
                            Ok::<usize, diesel::result::Error>(1)
                        } else {
                            Err::<usize, diesel::result::Error>(
                                diesel::result::Error::DatabaseError(
                                    diesel::result::DatabaseErrorKind::SerializationFailure,
                                    Box::new("test error".to_string()),
                                ),
                            )
                        }
                    })
                },
                handle_serialize_error,
            );

            db_retry
                .map_err(|err| ErrorBadRequest(format!("error: {}", err)))
                .and_then(|x: usize| Ok(HttpResponse::Ok().body("every thing is fine")))
        })
}

fn main() {
    println!("starting server");

    HttpServer::new(|| App::new().route("/handle", web::get().to_async(handler1)))
        .bind("127.0.0.1:8000")
        .expect("Can not bind to port 8000")
        .run()
        .unwrap();
}

Glad you found a solution. I admit I didn't look much at the code, but from the original description of your problem I wonder why you don't use a spinlock, or if you don't want to block return Poll::Pending and then wake up the task when the DB is ready to do the next transaction?

That would avoid retry mechanisms.

Thanks for the follow up! There is a reason i mentioned the XY-Problem in the beginning. I have to admit, that i am very inexperienced – or may i say unaware – of how to manage concurrent access of a database right. I have written many web applications without even caring about the consequences about what happens if multiple users using my web services. This was never a real problem (or at least i have never noticed corrupted data like lost updates etc.) with very few exceptions. Like a booking tool i was building (in java) i just put a mutex/semaphore in place so this won't marble up the data which was easily reproducible and it makes sense how it happened.

This was until recently i had a very hard to debug problem where my application state that was stored in the database was very seriously fucked up every now and then. It was a game that was played by many users concurrently (like 300) and was round based (like 5 minutes per round) and each team (30 teams composed of ~10 players) needs to collect points, by solving various problems, until the round ends etc. and ~ 10 rounds where played. There was a timer running in the background that triggered various things to happen (the round ends) like closing the round, sum up points, changing the state of the groups to close the round (that they could also close by them self with solving all the problems in this round while there is still time left) to go to the next obstacles etc.

This was the first time i really started to read into transaction isolation levels. I think i know basically what each isolation level is doing and what the various anomalies are now. But i just don't really know when to apply one in what situation. I just haven't found a serious source with useful treatment recommendation/guidance when it is allowed to use what level without having corrupted data (inconsistent state/ broken invariants) because of data races. I have read some papers that let me seriously question myself if i am always easily able to notice the situations where i have to use the isolation level serializable for example.

I could use a spinlock like i did the semaphore in my java example with the booking but blocking a single webservice / function / transaction to prevent concurrent access is not enough in some cases – like the one i vaguely described with the game. There are multiple webservices / functions involved that break the state and blocking a single one does not help – one needs to block every webservices (A,B,C ..) that is potentially involved if either of those are invoked.

I guess if i would strive to build something with Poll::Pending i need to somehow detect the conditions that could lead to the corruption in my application logic. That sounds like a very hard task.

I am currently heading in a direction where all my webservices/transactions have the isolation level serializable and retry them if they fail (only when the appropriate error happens, not at e.g. constraints violations) and just take the performance hit – i just don't even know how much of an effect there really is, i have never found good benchmarks comparing like read committed and serializable. So it is very unclear to me how hard the penalty really is. I just defer performance issues to the point i have them, i value consistent data more. This is maybe a stupid thing to do, but my current knowledge about the topic lets me arrive to this conclusion. "When in doubt (which i am) serialize!"

I would appreciate good sources on how to practically use and detect when to use which isolation level. But at my current state of knowledge i feel that i am unable to prove when read committed in enough and when i have to choose repeatable reads or even serializable.

Ok, I see, I admit I'm not a DB expert. So is the problem that when you do a write to the DB the situation might already have changed and the writing task would have had to take into account the new situation before deciding to write something, and as it doesn't the general state of the world no longer makes sense/is consistent?

It is true that async code is harder to debug. It also might be hard to implement if the DB Driver you use (like Diesel) only exposes an error interface.

From reading up on isolation levels, it does look like it fits the philosophy of async rust well. Using serializable transactions, there is waiting time. This time could be in Poll::Pending, with a wakup, but it kind of requires the DB Driver to support that. It seems like it would be the neatest design for an async program.

1 Like

Me neither :smiley:

Exactly! And my main problem is, how do i know if that is the case – for sure? I am so spoiled by Rust and its strictness to prevent me to do stupid data race stuff, that i constantly have the fear to mess things up in C/C++ (and even in my Java Code) that i completely avoid either using concurrency in those languages or the language as a hole :smiley: Sadly the Rust Compiler cannot look past the boundaries into the data base.

For that reason i am currently sticking with what the postgres docs are saying.
https://www.postgresql.org/docs/11/applevel-consistency.html

13.4.1. Enforcing Consistency With Serializable Transactions
If the Serializable transaction isolation level is used for all writes and for all reads which need a consistent view of the data, no other effort is required to ensure consistency. Software from other environments which is written to use serializable transactions to ensure consistency should β€œjust work” in this regard in PostgreSQL.
When using this technique, it will avoid creating an unnecessary burden for application programmers if the application software goes through a framework which automatically retries transactions which are rolled back with a serialization failure. It may be a good idea to set default_transaction_isolation to serializable . It would also be wise to take some action to ensure that no other transaction isolation level is used, either inadvertently or to subvert integrity checks, through checks of the transaction isolation level in triggers.

I like the "just works" and "will avoid unnecessary burden" part :stuck_out_tongue:

I don't know enough about how serializable really works to agree on that – i remember that i have read something about that READ ONLY and DEFERRED transactions could wait until it is save to commit without rolling back, but i don't know if that applies to writing transactions as well. But this sounds like a feature request for Diesel – i am just a little uncertain/insecure because of my lack of knowledge about that topic that i fear i cannot express the problem and solution space very well.