How to correctly call `async move` closures that accepts reference type as inputs

Rust Playground (rust-lang.org)

#[derive(Debug)]
struct A;
async fn f() {
    let x = A;
    let h = |x: &A| async move { println!("{:?}", x) };
    h(&x).await;
}

fn main() {}

I am trying to make a function receive an asynchronous closure as argument but fail. After some digging, I reduce the problem into the above simplified version, where the async move block is supposed to be a user-provided closure, that I simplified by simply inline it into the code.

How should I make this work?

If instead of using closure, I directly use an async block, this compiles as normal.

Rust Playground (rust-lang.org)

#[derive(Debug)]
struct A;
async fn f() {
    let x = A;
    // let h = |x: &A| async move { println!("{:?}", x) };
    // h(&x).await;
    async move {
        println!("{:?}", x);
    }.await
}

fn main() {
}

But this is not what I want since I want the user to provide the logic instead.

It's currently a little tricky to combine async, closures, and borrows and get the compiler to accept it. In the future, “async closures” may be supported, async |x: &A| { println!("{:?}", x) }, which will give the compiler more information to work with, but for now, you have to go to some extra lengths.

In general, even without async being involved, getting the type of a closure right is often best done by passing the closure to a function (because the function's signature constrains the type the closure should have, in ways that can't necessarily be expressed directly by the closure syntax). For example, this code compiles:

use std::future::Future;

#[derive(Debug)]
struct A;

fn async_callback<T, F, Fut>(f: F) -> F
where
    F: Fn(T) -> Fut,
    Fut: Future<Output = ()>,
{
    f
}

async fn f() {
    let x = A;
    let h = async_callback(|x: &A| async move { println!("{:?}", x) });
    h(&x).await;
    h(&A).await;
}

async_callback doesn't do anything itself (it is a fancy identity-function) but it constrains how the types of the closure and the async block interact with each other. However, you say that "I am trying to make a function receive an asynchronous closure as argument", so you probably already had something more like this than the code samples you posted. Can you please post a version of your problematic code which actually has the function that takes the closure as an argument? That's an importantly different situation than using a closure you just declared in the same function, and may not be solved by the same solutions.

2 Likes

Thank you.

The following is the more complete version of the code.

Rust Playground (rust-lang.org)

Unfortunately, I failed to convert your trick of identity function into my code. By adding lifetime annotations that I think necessary, the best I get is this Rust Playground (rust-lang.org).

I am thinking if this is because I am trying to hold a lock across await point, but as far as I know it would only be a clippy warning not compiler error.

There are two problems here. The shallow problem is this:

    pub async fn with_data<O, F, Fut>(&self, fun: F) -> O
    where
        F: FnOnce(Vec<&Thing>) -> Fut,
        Fut: Future<Output = O>,
        O: 'static,

When the function is called every type parameter has a single value (a concrete type), chosen by the caller (abstractly if not literally). But here, you want Fut to be able to borrow the Vec<&Thing>, but that Vec has a lifetime which is shorter than the function parameters (because it's a Vec owned by the function body), which is impossible to satisfy by any caller-chosen Fut type.

This particular sub-problem can be solved by using a trait alias for the combination of FnOnce and Future — which the handy async_fn_traits crate provides. You'd use it like this:

    pub async fn with_data<O, F>(&self, fun: F) -> O
    where
        F: for<'a> AsyncFnOnce1<Vec<&'a Thing>, Output = O>,
        O: 'static,

Notice that there now is no Fut type mentioned at all, so it doesn't have to be constrained to be a single parameter.

However, with only this change, the call site still doesn't compile:

error: lifetime may not live long enough
  --> src/main.rs:49:42
   |
49 |           .with_data(|things: Vec<&Thing>| async move {
   |  _________________________________-______-_^
   | |                                 |      |
   | |                                 |      return type of closure `{async block@src/main.rs:49:42: 53:10}` contains a lifetime `'2`
   | |                                 let's call the lifetime of this reference `'1`
50 | |             things.into_iter().for_each(|t| {
51 | |                 dbg!(&t);
52 | |             });
53 | |         })
   | |_________^ returning this value requires that `'1` must outlive `'2`

and I don't know any tricks for solving that problem. Unstable async_closures does solve it:

#![feature(async_closure)]

...
    let _filtered = app
        .with_data(async move |things: Vec<&Thing>| {
            things.into_iter().for_each(|t| {
                dbg!(&t);
            });
        })
        .await;

but perhaps someone else has a better idea for stable.

2 Likes

I'm quite confused about lifetimes, however, I have a "despicable" method:

use futures::Future;
use tokio::sync::RwLock;

#[derive(Debug)]
struct Thing {}

struct Application {
    things: RwLock<Vec<Thing>>,
}

impl Application {
    pub async fn with_data<F, Fut>(&self, fun: F)
    where
        F: FnOnce(Vec<&'static Thing>) -> Fut,
        Fut: Future<Output = ()>,
    {
        let things = self.things.read().await;
        let mut filtered = vec![];
        for t in things.iter() {
            if true {
                // imagine some filtering logic
                let t = t as *const Thing;
                let t: &'static Thing = unsafe { &*t };    // Coercion
                filtered.push(t);
            }
        }
        fun(filtered).await    // it's safe to trick the compiler that `filtered` is 'static, 
                                       // because we awaited it before its dropping
    }
}

#[tokio::main]
async fn main() {
    let app = Application {
        things: RwLock::new(Default::default()),
    };

    let _filtered = app
        .with_data(|things| async move {
            things.iter().for_each(|t| {
                dbg!(&t);
            });
        })
        .await;
}

As I said before, I'm confused about this, maybe this is acturally unsafe.

Edit: it's unsafe if &'static Thing is stashed or sent through a channel.

And you may read std::thread::scope, it's for situation like this:

use std::thread;

// This is wrong
fn foo(&mut self) {
    let a = &mut self.a;
    let jh = thread::spawn(move || *a += 1 );    // this need self to be 'static
    jh.join();
}

// The right one:
fn foo(&mut self) {
    let a = &mut self.a;
    thread::scope(|s| { 
        s.spawn(move || { *a += 1});
    });
}

await and join() look similar to each other,
so I feel there might be the safe method in thread::scope.

This is unsound because the function may take the &'static Things it is given and stash them to use later, after their referents have been deallocated.

How lucky I am, I just add lifetimes randomly, and the two ways could work:

#![feature(async_closure, async_fn_traits)]

use core::future::Future;
use std::ops::AsyncFnOnce;

#[derive(Debug)]
struct Thing;

async fn foo1<F>(f: F)
where
    for<'a> F: AsyncFnOnce(&'a Thing),
{
    let t = &Thing;
    f(t).await;
}

async fn foo2<'a, F, Fut>(f: F)
where
    F: FnOnce(&'a Thing) -> Fut,
    Fut: Future<Output = ()>,
{
    let t = &Thing;
    f(t).await;
}

#[tokio::main]
async fn main() {
    foo1(async move |t| {
        println!("{:?}", t);
    })
    .await;
    foo2(move |t| async move {    // double `move` here
        println!("{:?}", t);
    })
    .await;
}

I'm inspired by this. Can anyone so kind to explain why this can work?

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.