Lifetimes in asynchronous closure

I have a static HashMap mapping u32 "handles" onto (generic) objects called ObjectCache<T>, which can, given a handle and an async closure taking a reference to the corresponding object, pick up the corresponding object and call the asynchronous closure with the object as the argument:

pub struct ObjectCache<T> {
    pub store: HashMap<u32, T>,
}

impl<T> ObjectCache<T> {
    pub fn new() -> ObjectCache<T> {
        ObjectCache {
            store: Default::default(),
        }
    }

    pub async fn get<F, R, U>(&self, handle: u32, closure: F) -> Result<R, ()>
    where
        F: Fn(&T) -> U,
        U: Future<Output=Result<R, ()>>
    {
        match self.store.get(&handle) {
            Some(obj) => closure(obj).await,
            None => Err(())
        }
    }
}

In it, I want to store an Agent struct which can, as a part of its API, asynchronously process a request:

lazy_static! {
    pub static ref AGENT_CACHE: ObjectCache<Agent> = ObjectCache::<Agent>::new();
}

pub struct Agent;

impl Agent {
    pub async fn process(&self, request: &str) -> Result<u32, ()> {
        info!("Processing {}", request);
        sleep(Duration::from_secs(1)).await;
        Ok(42)
    }
   ...
}

For each function in Agents API, I want to have corresponding function which takes the same arguments + a handle. This function picks up an agent corresponding to the handle from AGENT_CACHE and calls the function using the cache's get() function:

async fn perform_action(request: &str, handle: u32) -> Result<u32, ()> {
    AGENT_CACHE.get(handle, |agent: &Agent| agent.process(request)).await
}

The problem is, the above code does not compile. The error is

error: lifetime may not live long enough
  --> src/main.rs:59:46
   |
59 |     OBJECT_CACHE.get(handle, |agent: &Agent| agent.process(request)).await
   |                                      -     - ^^^^^^^^^^^^^^^^^^^^^^ returning this value requires that `'1` must outlive `'2`
   |                                      |     |
   |                                      |     return type of closure `impl futures::Future` contains a lifetime `'2`
   |                                      let's call the lifetime of this reference `'1`

It seems that the compiler must make sure that the value referenced in the argument is valid at the moment the future is called, and hence it must live at least as long the future itself. In this case, the value in the cache may live as long as static, but perhaps also be dropped while the future is pending (?).

Is there a way to constrain the lifetimes in such a way that the code is safe, compiles and doesn't get rid of the get() function?

1 Like

Futures have to contain everything they use, so if agent.process uses self, which is borrowed, then the &Agent it will be contained in the Future it returns.

So you have to explain to the borrow checker that the Future returned by the closure is still using closure's arguments.

Ideally it would be something like this:

 F: for<'a> Fn(&'a T) -> U + 'a,

but unfortunately that isn't a valid syntax.

This might work:

pub async fn get<'a, F, R, U>(&'a self, handle: u32, closure: F) -> Result<R, ()>
where
        F: Fn(&'a T) -> U,
        U: Future<Output=Result<R, ()>> + 'a

Or try F: for<'a> Fn(&'a T) -> BoxFuture<'a, R> instead, which requires using dynamic dispatch instead of generics for the future type.

A different option is to use Arc<Agent> instead of a temporary loan. fn process(self: Arc<Agent>) is allowed. Arc is a shared reference that doesn't require lifetime annotations.

I tried this approach, it seems hard/impossible to allow capturing the request: &str variable with this API. The F closure with this API would e.g. be forced to, given a &'static T, return a BoxFuture<'static, R>; no way to capture the short-lived &str in this future.

I don't know that it means the same thing as you intended, but

for<'a> F: Fn(&'a T) -> U + 'a,

is a valid syntax. (The reference is unclear on this point and their example is one where the different scopes makes no difference, which has always irked me.)

That's .. unexpected .. or funny, I guess? .... trying to test out possible workaround I ran into a stack overflow of rustc. That doesn't happen any day. For anyone interested, this is the not-at-all-cleaned-up code I had that triggered it (with a few more lines commented out due to an unrelated unavailable dependency in the playground) [link]. Will write up a bug report next :slight_smile: Edit: Done #92833.

2 Likes

Unfortunately, the meaning of that is the same as this:

for<'a> F: Fn(&'a T) -> U,
for<'a> F: 'a,

So the 'a binds to F, and not to U.

So, FWIW, the following works:

#![allow(unused)]
#![warn(unused_must_use)]

use ::{
    futures::{
        future::{BoxFuture, FutureExt},
    },
    std::{
        collections::HashMap,
        future::Future,
    },
};

pub
struct ObjectCache<T> {
    pub store: HashMap<u32, T>,
}

impl<T> ObjectCache<T> {
    pub
    fn new ()
      -> ObjectCache<T>
    {
        ObjectCache {
            store: <_>::default(),
        }
    }

    pub
    async
    fn get<'up, F : 'up, R> (
        self: &'_ ObjectCache<T>,
        handle: u32,
        closure: F,
    ) -> Result<R, ()>
    where
        for<'r>
            F : FnOnce(&'r T, [&'r &'up (); 0])
                  -> BoxFuture<'r, Result<R, ()>>
        ,
    {
        match self.store.get(&handle) {
            | Some(obj) => closure(obj, []).await,
            | None => Err(())
        }
    }
}

async
fn perform_action (
    request: &'_ str,
    handle: u32,
) -> Result<u32, ()>
{
    AGENT_CACHE
        .get(
            handle,
            move |agent: &'_ Agent, []| {
                agent.process(request)
            }.boxed(),
        )
        .await
}

::lazy_static::lazy_static! {
    pub
    static ref AGENT_CACHE: ObjectCache<Agent> = {
        ObjectCache::<Agent>::new()
    };
}

pub
struct Agent;

impl Agent {
    pub
    async
    fn process (
        self: &'_ Agent,
        request: &'_ str,
    ) -> Result<u32, ()>
    {
        // info!("Processing {}", request);
        let () = async {}.await;
        // sleep(Duration::from_secs(1)).await;
        Ok(42)
    }
}

It showcases the hack described over

to be able to upper-bound the higher-order lifetime in for<'r> FnOnce(&'r T) -> BoxFuture<'r, …> (the problem @steffahn was talking about) by adding an extra phony parameter, [&'r &'up (); 0] (for some "free" generic lifetime parameter <'up>) so as to yield an implicit where 'up : 'r bound on the higher-order lifetime, thus allowing to capture stuff outside the closure. It then requires the call sites to provide that extra [] parameter.

2 Likes

That being said, when seeing:

I just wonder why you didn't "just" use:

    pub
    async
-   fn get<F, R, U> (
-       self: &'_ ObjectCache<T>,
+   fn get<'get, F, R, U> (
+       self: &'get ObjectCache<T>,
        handle: u32,
        closure: F,
    ) -> Result<R, ()>
    where
-       F : Fn(&T) -> U,
+       F : Fn(&'get T) -> U,
        U : Future<Output = Result<R, ()>> + Send,
    {
        match self.store.get(&handle) {
            Some(obj) => closure(obj).await,
            None => Err(())
        }
    }

That is, no need to go higher-order unless you need to! (especially for an ObjectCache which ends up in a static, whereby 'get can be picked to be 'static, which further simplifies things).

  • Or is your self.store.get(&handle) a simplification? And there is some RwLock kind of layer which yields a locally-produced-and-thus-locally-borrowed value?

It is a simplification (I was trying to make the example minimal), the actual ObjectCache<T> looks like this:

pub struct ObjectCache<T> {
    ...
    pub store: RwLock<HashMap<u32, Mutex<T>>>,
}
1 Like

In that case the higher-order lifetime is indeed warranted :100: