Expected a closure that implements the `Fn` trait, but this closure only implements `FnMut`

I'm using the warp and redis libraries and I want to set a redis key for reach request. Here is the code:

use redis::Commands;
use std::net::SocketAddr;
use warp::{Filter, http::Response};

#[tokio::main] 
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
    let redisclient = redis::Client::open("redis://127.0.0.1:6379").unwrap();
    let mut rediscon = redisclient.get_connection().unwrap();

    let routes = warp::path("test")
        .map(|| {
            let _server_success: String = rediscon.set("test", "test").unwrap();

            Response::builder()
                .body(format!("{}", "test"))
        });

    warp::serve(routes).bind(addr).await;
}

The problem is that I'm getting an error at line 12 "expected a closure that implements the Fn trait, but this closure only implements FnMut". I dont know what I'm doing wrong but an explanation would be great. If the way I'm doing it is wrong please state how I'm supposed to do it.

Thanks

I'm not familiar with the crates involved, but looked at the API docs a bit. It looks like warp::Filter::map expects a closure that is going to be executed in a multi-threaded environment, hence the need for Fn instead of FnMut.

For multi-threaded use, redis seems to offer MultiplexedConnection, or ConnectionManager. (I don't actually know how to properly construct the former.)

You will most likely need to add move to your closure anyways in order to support a 'static bound for warp::serve. I.e. .map(move || { ... }). Inside of the closure, you make sure it stays a Fn by cloning the captured MultiplexedConnection/ConnectionManager and then working with that clone. If you want to do a query in the .map, it might make sense to use .then instead of .map, and AsyncCommands in order to write async code.


To give some more details to how the above relates to Fn/FnMut:

A closure that implements Fn is a closure that can be called with shared (&self) access to the closure, a closure that only implements FnMut but not Fn can only be called with exclusive mutable (&mut self) access to the closure. This means that via FnMut the closure cannot be called multiple times in parallel, e.g. on multiple threads, because &mut _ references are always exclusive; but in turn, FnMut allows for mutable access to captured variables.

Your closure calls a &mut self method on a captured variable, that is: it calls rediscon.set(...), which means it can implement FnMut, but Fn is no longer possible.

If, as I described above, you were to use e.g. a ConnectionsManager, and called let my_manager = manager.clone();, then that manager.clone() call would be a &self method on the captured managers variable; further down in the closure, &mut-methods on the variable my_manager would still be allowed as that would be a variable that's local to the closure.


Regarding the concrete solution of using MultiplexedConnection/ConnectionManager, feel free to alternatively consider following different advice from anyone more knowledgeable than me about what the best practical solution here would be; as mentioned, I've just briefly looked at the API, so I cannot answer and questions about what the best trade-off would be.

2 Likes

Warp will respond to multiple responses in parallel which means there may be multiple calls to your handler function at the same time[1]. Because it can be called multiple times in parallel, the closure can't mutate any of the variables it captures. Connection methods take &mut self, so we aren't allowed to create a single connection inside main() and let the handler function use it.

This is what the "Fn vs FnMut" error message is trying to tell you.

I haven't used warp and redis together before, but I guessing the solution is to pass the Client to the closure and create a new connection every time the handler is called.

#[tokio::main] 
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
    let redisclient = redis::Client::open("redis://127.0.0.1:6379").unwrap();

    let routes = warp::path("test")
        .map(|| {
            let mut rediscon = redisclient.get_connection().unwrap();
            let _server_success: String = rediscon.set("test", "test").unwrap();

            Response::builder()
                .body(format!("{}", "test"))
        });

    warp::serve(routes).bind(addr).await;
}

  1. Let's skip the "concurrency vs parallelism" details for now. Chances are if you understand that much you already know why the OP is seeing this error. ↩ī¸Ž

1 Like

Thanks for the response.

That makes sense but now I'm faced with another issue. "expected a closure that implements the Fn trait, but this closure only implements FnOnce"

I switched the redis to the async one and used ConnectionManager.

use redis::AsyncCommands;
use std::net::SocketAddr;
use warp::{Filter, http::Response};

#[tokio::main] 
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
    let redisclient = redis::Client::open("redis://127.0.0.1:6379").unwrap();
    let rediscon = redis::aio::ConnectionManager::new(redisclient).await.unwrap();

    let routes = warp::path("test")
        .then(|| async move {
            let mut rediscon = rediscon.clone();
            let _server_success: String = rediscon.set("test", "test").await.unwrap();

            Response::builder()
                .body(format!("{}", "test"))
        });

    warp::serve(routes).bind(addr).await;
}

Ah.. well, for the async approach, you'll have to use two moves (but that's not the main problem behind the error you get), and also the clone would need to happen inside of the closure but outside of the async move block. Like this, it moves the original rediscon into the returned closure async block when calling the closure, which can only happen once. So try something like

use redis::AsyncCommands;
use std::net::SocketAddr;
use warp::{Filter, http::Response};

#[tokio::main] 
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
    let redisclient = redis::Client::open("redis://127.0.0.1:6379").unwrap();
    let rediscon = redis::aio::ConnectionManager::new(redisclient).await.unwrap();

    let routes = warp::path("test")
        .then(move || {
            let mut rediscon = rediscon.clone();
            async move {
                let mut rediscon = rediscon.clone();
                let _server_success: String = rediscon.set("test", "test").await.unwrap();

                Response::builder()
                    .body(format!("{}", "test"))
            }
        });

    warp::serve(routes).bind(addr).await;
}

I also tried moving the move before the closure arguments but that resulted in a lifetime error

That worked. Thank you very much!

By the way (and maybe you're already aware of this anyway), this second look at the APIs made me find the way how MultiplexedConnection can be created; I hadn't bothered looking through all of those methods on Client yet :smiley:

I'm not sure what the stream argument is for haha

Me neither, but if you use one of the Client::get_multiplexed[...]_connection methods, you don't need to provide any stream argument. Apparently it also has a convenience method so you don't have to call ConnectionManager::new yourself

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.