Lifetime bounds while passing mutable references across async boundaries

I am trying to write a util function to do a retry until success loop on an api call on a mutable reference

// Helper trait to express the HRTB requirement for the work closure.
pub trait WorkFn<'a, T, ResT>: Send + 'static {
    type Future: Future<Output = Result<ResT, tonic::Status>> + Send + 'a;
    fn call_mut(&mut self, t: &'a mut T) -> Self::Future;
}

// Blanket implementation to automatically satisfy WorkFn for any closure F
impl<'a, T, ResT, F, Fut> WorkFn<'a, T, ResT> for F
where
    F: FnMut(&'a mut T) -> Fut + Send + 'static,
    // The Future must live at least as long as the short-lived mutable reference.
    Fut: Future<Output = Result<ResT, tonic::Status>> + Send + 'a,
    T: 'a,
    ResT: 'a,
{
    type Future = Fut;
    fn call_mut(&mut self, t: &'a mut T) -> Self::Future {
        self(t)
    }
}

async fn async_mut_retry<T, F, S, R, ResT>(
    // 1. Mutable reference to the resource (e.g., &mut Client)
    resource: &mut T,
    // 2. The shutdown signal
    shutdown_notify: Arc<Notify>,
    // 3. The retry strategy (an iterator of Durations)
    mut strategy: S,
    // 4. The work to be done (takes &mut T)
    mut work: F,
    // 5. The custom retry logic (takes the error)
    mut is_retryable: R,
) -> Result<ResT, tonic::Status>
where
    F: for<'a> WorkFn<'a, T, ResT>, // Work closure
    ResT: Send + Sized,               // Success type
    S: Iterator<Item = Duration>,     // Strategy iterator
    R: FnMut(&tonic::Status) -> bool, // Retry logic closure
    T: Send + 'static,
    ResT: Send + 'static,
{
    loop {
        // --- 1. NON-BLOCKING SHUTDOWN CHECK BEFORE WORK ---
        tokio::select! {
            _ = shutdown_notify.notified() => {
                return Err(tonic::Status::cancelled("Shutdown requested before attempt"));
            }
            _ = tokio::task::yield_now() => {}
        }

        // --- 2. PERFORM THE WORK ---
        let result = work.call_mut(resource).await;

        match result {
            // Success
            Ok(output) => return Ok(output),

            // Failure: Check custom retry logic
            Err(e) => {
                if is_retryable(&e) {
                    // Get the next delay
                    let delay = match strategy.next() {
                        Some(d) => d,
                        None => {
                            // Strategy exhausted
                            return Err(e);
                        }
                    };

                    println!("Retryable error: {}. Retrying in {:?}...", e, delay);

                    // --- 3. AWAIT BACKOFF OR SHUTDOWN ---
                    tokio::select! {
                        _ = tokio::time::sleep(delay) => {
                            continue; // Delay finished, loop continues
                        }
                        _ = shutdown_notify.notified() => {
                            // Shutdown arrived during delay
                            return Err(tonic::Status::cancelled("Shutdown requested during backoff"));
                        }
                    }
                } else {
                    // Non-retryable error
                    return Err(e);
                }
            }
        }
    }
}

I call this util with a client like this

        let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(INITIAL_RETRY_DELAY_MS)
            .max_delay(Duration::from_millis(MAX_RETRY_DELAY_MS));

        let result = async_mut_retry(
            &mut client,
            shutdown_notify.clone(),
            retry_strategy,
            |client: &mut TonicGeneratedClient<u64>| {
//               let mut client_clone = client.clone();
//               async move {
//                    let request = TonicApiRequest {};
//                    client_clone.api_call(request).await
//               }
                 let request = TonicApiRequest {};
                 client.api_call(request)
            },
            |e| is_recoverable_error(e),
        )
        .await;

My generated tonic code is roughly like this

        pub async fn api_call(
            &mut self,
            request: impl tonic::IntoRequest<super::TonicApiRequest>,
        ) -> std::result::Result<
            tonic::Response<tonic::codec::Streaming<super::TonicApiResponse>>,
            tonic::Status,
        > {
            self.inner
                .ready()
                .await
                .map_err(|e| {
                    tonic::Status::unknown(
                        format!("Service was not ready: {}", e.into()),
                    )
                })?;
            let codec = tonic_prost::ProstCodec::default();
            let path = http::uri::PathAndQuery::from_static(
                "/dummy.DummyService/endpoint",
            );
            let mut req = request.into_request();
            req.extensions_mut()
                .insert(GrpcMethod::new("dummy.DummyService", "Endpoint"));
            self.inner.server_streaming(req, path, codec).await
        }

The current version of the call does not work and has lifetime errors as follows (on the client.api_call line)

  • lifetime may not live long enough: returning this value requires that '1 must outlive '2
  • let's call the lifetime of this reference '1
  • return type of closure impl futures::Future<Output = Result<tonic::Response<Streaming<TonicApiResponse>>, Status>> contains a lifetime '2

However, the commented code works. I understand that it works because the clone creates an owned version and the lifetime of the return value is no longer tied to the passed mutable reference . I am trying to not clone the client if possible. Arc<> does not work since the api_call method needs &mut self. Any suggestions on how to fix this ? I do not understand Rust lifetime parameters and bounds very well but I am thinking there is a fix somehow if I specify correct bounds on the helper api method.

The code also works if I inline the entire function at the call site

Can you please create a smaller example that will represent the same situation, but inside a playground? https://play.rust-lang.org

Does this fix it?

-        |client: &mut Client| {
+        async |client: &mut Client| {
             let request = TonicApiRequest {};
-            client.api_call(request)
+            client.api_call(request).await
         },

(Err, I changed the closure signature while reproducing, but you get the idea.)

1 Like

Yes!! That did it. Do you mind also explaining a little bit of what this means ?

Well, async closures can have different capabilities than non-async closures by implementing the AsyncFn traits, which allows "lending closures" -- closures that return references to owned values they've captured, say. You are not actually using that capability -- you would need to be using AsyncFnMut bounds instead of FnMut bounds -- but the mere existence of those capabilities means that inference must be somewhat different in an async closure than a normal closure.

So that was just the first thing I tried once I had a reproduction. Fortunately, it worked. (As the link says, async closures still implement the normal Fn traits when possible.)


Normal closures have a lot of problems around inferring that you want to return a type with a lifetime that corresponds to an input -- something as simple as str::trim's signature. There are partial workarounds, but they generally require naming the return type. That means they don't generally apply to future-returning closures unless you type-erase the futures, since most futures are unnameable. I've beaten my head on that wall enough times to try something else first.

Anyway, that poor inference for normal closures is probably what is going on in the OP (but I stopped poking at it once the change allowed compilation).

1 Like