How to pass a closure that returns a Future, as function argument?

Hello everyone, I am running into another problem now, I am using tokio to handle async/await stuff. What I want to do is something like this:

server.listen(|request, response| async {
        println!("Middlware");
}).await;

What I know is that I cannot make a Futures with arguments, so I found this way to accept a Function that returns a future as parameter:

pub async fn listen<Fut>(&self, middleware: impl FnOnce() -> Fut + Send) where Fut: Future<Output = ()> {
        ...
}

This kinda works. but the problem is that this function is responsible for spawning tokio tasks, and I want to pass this FnOnce() -> Fut to that as well, so I am passing it like this:

pub async fn listen<Fut>(&self, middleware: impl FnOnce() -> Fut + Send) where Fut: Future<Output = ()> {
        let func = Arc::new(Mutex::new(Box::new(middleware)));
        loop {
            let (socket, addr) = self.listener.accept().await.unwrap();
            let func = func.clone();
            tokio::spawn(async {
                handle(socket, func).await; // Here it is being passed in
            });
        }
}

The problem arises in the handle function, which is like this

async fn handle<Fut>(mut socket: TcpStream, middleware: Arc<Mutex<Box<impl FnOnce() -> Fut>>>) where Fut: Future<Output = ()>, {
    let (read, mut writer) = socket.split();
    let reader = BufReader::new(read);
    let request = parse_request(reader).await;
    {
        let func = middleware.lock().unwrap();
        func(); // cannot move out of dereference of `std::sync::MutexGuard<'_, std::boxed::Box<impl FnOnce() -> Fut>>`
    }
    let response = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK";
    writer.write_all(response).await.unwrap();
}

I do not understand why is this error happening? What am I doing wrong?
Thanks

Mutexes only give you by-reference access, so unless you resort to some tricks like Option::take, you can't take ownership of the value behind the mutex. But calling FnOnce takes ownership of it. So use impl FnMut instead of impl FnOnce.

It also wouldn't quite make sense to use FnOnce for a handler, even if it were not behind a mutex. Handlers are typically called many times, so if you were to store the function in any kind of collection, the same issue would arise. For example, if you stored it in a hash table or a vector, you could technically move it out, but then it wouldn't be found next time you needed it, probably leading to a logical error and/or panics.

2 Likes

Hi,

without digging to much into the details at the first look at it the usage of FnOnce is quite likely the issue. The reason is, that this function types can only be called once. Thus, calling it with func() would consume your callback. This is not possible as you only have access to the borrow of this function - as a result of being wrapped into a Mutex.

Thus I‘d suggest to change the definition I to either Fn or FnMut.

If this callback should really be called only once you might want to further wrap the function into an Option and take it out far a call. This way you can consume the function and leave a None, ensuring this is never called a second time… But I guess this is not what you likely to want as you call it within a loop ….

1 Like

Thank you for your replies, that fixed this issue, but now a new issue arises when I try to implement arguments passing, here is the function that receives the closure

async fn handle<'a, Fut>(mut socket: TcpStream, middleware: Arc<Mutex<Box<impl FnMut(Request, Response) -> Fut + Send + 'static>>>) where Fut: Future<Output = ()> + Send, {
    let (mut read, mut writer) = socket.split();
    let reader = BufReader::new(read);
    let request = parse_request(reader).await; // Returns type Request
    let response = Response::new(BufWriter::new(writer)); // Return type Response<'a>

    {
        let lock = middleware.lock().unwrap()(request, response);
        lock.await;
    }

This is my Response object implementation

pub struct Response<'a> {
    protocol: String,
    writer: BufWriter<WriteHalf<'a>>
}

impl <'a> Response<'a> {
    pub fn new(mut writer: BufWriter<WriteHalf <'_>>) -> Response {
        Response {
            protocol: String::from("HTTP/1.1"),
            writer
        }
    }

    pub async fn write(&mut self, data: &Vec<u8>) {
        let response = &format!("{} {} \r\nContent-Length: {}\r\n", self.protocol, "200 OK", &data.len().to_string());
        self.writer.write(response.as_bytes()).await.unwrap();
        self.writer.write_all(data).await.unwrap();
    }
}

The Request object is getting passed in correctly, no errors, but when I try to pass the Response object there is this error on this lines:

server.listen(|request, response| async move {
        println!("{:?}", request.headers);
        //lifetime may not live long enough
        // returning this value requires that `'1` must outlive `'2`
        println!("{:?}", response);
}).await;

I am sure the problem is because of the lifetime in WriteHalf, but I still don't understand how would I fix this error

Async callbacks cannot take borrowed parameters, it's a known limitation. You'll have to make that type 'static, probably by using an OwnedWriteHalf. Also, instead of using a mutex around a FnMut, just use a Fn: Arc<Box<impl Fn(Request, Response) -> Fut + Send + Sync>>. And I'm not quite sure why you're boxing it twice, it's not necessary.

I cannot use Arc<Box<impl Fn(Request, Response) -> Fut + Send + Sync>> because then the error is

future cannot be sent between threads safely
future created by async block is not `Send`

Make sure you bound the Fut type by Send then. You would have to have done that anyway.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.