Async retry function

Hello,

I'd like to have make a function that allows me to retry a function a certain number of times. I've taken a suggestion I saw on StackOverflow:

async fn retry<T, E, Fut, F>(retries: usize, mut f: F) -> Result<T, E>
where
    F: FnMut() -> Fut,
    Fut: Future<Output = Result<T, E>>,
{
    let mut count = 0;
    loop {
        let result = f().await;
        if result.is_ok() {
            return result;
        } else {
            count += 1;
            if count >= retries {
                return result;
            }
        }
    }
}

It works as long as the async block doesn't capture anything from the closure:

struct Foo {}

impl Foo {
    async fn call(&self) -> Result<(), Infallible> {
        unimplemented!()
    }
}

async fn do_things() {
    let mut foo = Foo {};
    retry(1, || async { foo.call().await }).await.unwrap();
}

But it breaks when the async block captures variable from the closure, because of course the closure can now only be called once:

struct Foo {}

impl Foo {
    async fn call_mut(&mut self) -> Result<(), Infallible> {
        unimplemented!()
    }
}

async fn do_things() {
    let mut foo = Foo {};
    retry(1, || async { foo.call_mut().await }).await.unwrap();
}

This leads to:

error: captured variable cannot escape `FnMut` closure body
  --> src/lib.rs:43:17
   |
38 |     let mut foo = Foo {};
   |         ------- variable defined here
...
43 |     retry(1, || async { foo.call_mut().await }).await.unwrap();
   |               - ^^^^^^^^---^^^^^^^^^^^^^^^^^^^
   |               | |       |
   |               | |       variable captured here
   |               | returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
   |               inferred to be a `FnMut` closure
   |
   = note: `FnMut` closures only have access to their captured variables while they are executing...
   = note: ...therefore, they cannot allow references to captured variables to escape

There is a workaround for that (see this rust-lang issue), which consists in wrapping Foo in a Rc<RefCell>, since the retry function will never be called before the future it returned during the previous iteration finished:

    let rc_foo = Rc::new(RefCell::new(foo));
    retry(1, || async { rc_foo.borrow_mut().call_mut().await })
        .await
        .unwrap();

But that's very un-ergonomic. Are there better solutions today on stable rust?

PLAYGROUND

The problem is that Rust's type system has no way to express the constraint "the future returned by this function will be dropped before this function is called again," and since that future must contain a &mut Foo referencing your foo variable, two copies of that future cannot coexist. Rust solves this problem by prohibiting references from escaping the closure, which prohibits creating that future.

Rc/Arc, Mutex, and other wrappers are all ways to move unique mutability checks from the compiler to your running program, and in this situation I don't think you can entirely avoid them. The other option would be to allow cloning foo, which allows

    retry(1, || async { foo.clone().call_mut().await }).await.unwrap();
4 Likes

the future returned by this function will be dropped before this function is called again

Is this what the LendingFnMut trait expresses ?

1 Like

Yes; I should have been more careful with my phrasing, so thank you.

Such a constraint is expressible, but not via the traits that closures today actually implement.

I had a similar problem, and while helping me with my problem, the ubiquitos @kpreid suggested I use async_fn_traits - Rust It's possible I am misunderstanding the problem and what this crate solves, but @kpreid described it to me as:

It lets you not have to mention the type of the future in your generics. This is always mildly awkward, and becomes a disaster if the future borrows from the function arguments (which it doesn't, but I still prefer the cleanliness of not having an extra, always-constrained generic parameter)

...

when what type Fut is has a (lifetime) dependency on the parameters to F
…er

and when those parameters are (implicitly) for<'a> accepts-any-lifetime

if the lifetimes are also parameters to the higher-order-function themselves, then they're concrete not generic for this purpose

yeah okay it's not simple to explain :slight_smile:

Hope this maybe helps!

Thanks @arifd but I don't fully understand how this crate works if I'm being honest. In the end, I opted for a much simpler approach: using a macro.

Besides working, it has the benefits of supporting optional arguments:

macro_rules! retry {
    ($f:expr, $count:expr, $interval:expr) => {{
        let mut retries = 0;
        let result = loop {
            let result = $f;
            if result.is_ok() {
                break result;
            } else if retries > $count {
                break result;
            } else {
                retries += 1;
                tokio::time::sleep(std::time::Duration::from_millis($interval)).await;
            }
        };
        result
    }};
    ($f:expr) => {
        retry!($f, 5, 100)
    };
}

Note that there are still limitations with the macros. For instance I'd like to be able to do:

retry! {{
    foo.call_mut().await?;
    foo.call_mut().await?;
    foo.call_mut().await
}}.unwrap()

But:

error[E0277]: the `?` operator can only be used in an async function that returns `Result` or `Option` (or another type that implements `FromResidual`)
  --> src/lib.rs:39:25
   |
36 |   async fn do_things() {
   |  ______________________-
37 | |     let mut foo = Foo {};
38 | | retry! {{
39 | |     foo.call_mut().await?;
   | |                         ^ cannot use the `?` operator in an async function that returns `()`
...  |
42 | | }}.unwrap()
43 | | }
   | |_- this function should return `Result` or `Option` to accept `?`

But this works:

retry! {async {
    foo.call_mut().await?;
    foo.call_mut().await?;
    foo.call_mut().await
}}.unwrap()

playground link

Yes, and also the macro allows you to pass in a literal that you could then pass into your logging to know where it was called from, as #[track_caller] does not work for futures.

Here's what I'm working with right now, and I'm very happy with it:

/// Repeats indefinitely a given [`Future`], until it becomes successful,
/// yiedling the output of the given [`Future`].
///
/// This function is meant for tasks that communicate with services where
/// an error indicates a temporary service failure.
///
/// Note that `retry_on_err` will keep retrying indefinitely unless cancelled,
/// potentially leading to accumulation of tasks that cannot make progress.
/// Ensure that such tasks will eventually be cancelled, or will be finite in
/// number, to avoid resource exhaustion.
///
/// # Examples
/// ```no_run
/// let nothing_to_unwrap: T = retry_on_err(|| async {
///     contact_remote_service().await?;
/// })
/// .await;
/// ```
pub async fn retry_on_err<T, E, F, Fut>(f: F) -> T
where
    F: Fn() -> Fut,
    Fut: Future<Output = Result<T, E>>,
{
    let now = Instant::now();
    let backoff = Duration::from_millis(500);
    let factor = 1.5;
    let limit = Duration::from_secs(60 * 2);
    let warn = Duration::from_secs(60 * 60);
    let mut rng = rand::rngs::OsRng;
    let mut jitter = || rng.gen_range(Duration::ZERO..backoff);

    loop {
        match f().await {
            Ok(val) => return val,
            Err(_) => {
                let elapsed = now.elapsed();
                if elapsed > warn {
                    let elapsed = humantime::format_duration(elapsed);
                    error!(%elapsed);
                }
                let retry_in = backoff.mul_f32(factor).min(limit) + jitter();
                tokio::time::sleep(retry_in).await;
            }
        }
    }
}

Note that uses the OsRng and not thread_rng() because that would make this fn impossible to use in something like tokio::spawn which needs Send

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.