Passing an async function and then running it on a async thread

I have the code in the library:

pub async fn launch_server<F>(
    listener: TcpListener, 
    function: impl Fn(&mut TcpStream) -> F
) where F: Future<Output = ()> + std::marker::Send + 'static,
{    loop {
        let (mut socket, _) = listener.accept().await.unwrap();
        tokio::spawn(async move {
            function(&mut socket).await;
        });
    }
}

But when trying to pass a function to it (outside the library):

async fn work(stream: &mut TcpStream) -> () { }

Call code:

launch_server(listener, work).await

I get this error:

error: higher-ranked lifetime error
 --> src\main.rs:9:5
  |
9 |     launch_server(listener, work).await;
  |     ^^^^^^^^^^^^^

Are you using the latest version of Rust? Do you get equally unhelpful error in nightly?

I've tried to reproduce this problem, and I've got several errors, but not this one.

Your function is moved in a loop. Moves take exclusive ownership of the object, and the scope the object has been moved from is not allowed to use it any more. This exclusivity is in conflict with loops, since the loop needs to keep the object for itself and can't move it anywhere, if it wants to have it for the next iteration. You need to make your function implement Copy or Clone and then you need to clone() it for every iteration of the loop.

The function is used inside Send + 'static future, so the function must also implement Send + 'static:

function: impl Fn(&mut TcpStream) -> F + Copy + Send + 'static

Even when you don't use stream in the body of your async function, it is still captured in the opaque future returned from work. This makes the returned future not 'static, but have the lifetime of stream. I.e. work desugars to:

fn work<'a>(stream: &'a mut TcpStream) -> Future<Output=()> + 'a {
    async move {}
}

(Edit: see @quinedot's answer below for how it really desugars :smiling_face_with_tear:)

The async closures RFC explains the weird error message far better than I could:

This happens because the type for the Fut generic parameter is chosen by the caller, in main, and it cannot reference the higher-ranked lifetime for<'c> in the FnMut trait bound, but the anonymous future produced by calling do_something does capture a generic lifetime parameter, and must capture it in order to use the &str argument.

My suggestion would be to change your implementation such that your callback takes TcpStream by value and not by mutable reference while the AsyncFn traits are still unstable:

use std::future::Future;
use tokio::net::{TcpListener, TcpStream};

pub async fn launch_server<F>(
    listener: TcpListener, 
    function: impl Fn(TcpStream) -> F + Send + Copy + 'static,
) where F: Future<Output = ()> + Send,
{    loop {
        let (socket, _) = listener.accept().await.unwrap();
        tokio::spawn(async move {
            function(socket).await;
        });
    }
}

async fn work(_stream: TcpStream) -> () { }

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("localhost:8080").await.unwrap();
    launch_server(listener, work).await;
}

Playground.

1 Like

Nits:

// Don't apply an outlives bound, but do capture all generics
// (and you need the `impl`)
fn work<'a>(stream: &'a mut TcpStream) -> impl Future<Output=()> + use<'a> {
    async move {
        // Unconditionally capture all arguments
        let stream = stream;
        // (original body here)
    }
}
3 Likes

Although I couldn't get this code to compile :sweat_smile:, it explains pretty well how it actually works.

What to do if it is necessary to pass & or &mut?

You could still pass TcpStream by value and return it back from the future when it is done with it. Example.

You can also work with a concrete dyn Future type instead of an opaque future. This allows you to get rid of the problematic F bound on launch_server. Example.

1 Like

Thank you, I will use option 1, because it suits me the most. And I have never had such ideas before xD

1 Like