Async closure doesn't implement FnMut error

I got compile error when I tried to mutate variable.
The error is

 
⣿
Errors

Exited with status 101

Standard Error

   Compiling playground v0.0.1 (/playground)
error: async closure does not implement `FnMut` because it captures state from its environment
  --> src/main.rs:51:42
   |
51 |     let mut stream = pin!(StreamObj::new(async || {
   |                           -------------- ^^^^^^^^
   |                           |
   |                           required by a bound introduced by this call
   |
note: required by a bound in `StreamObj::<F, FUT, O>::new`
  --> src/main.rs:14:8
   |
14 |     F: FnMut() -> FUT,
   |        ^^^^^^^^^^^^^^ required by this bound in `StreamObj::<F, FUT, O>::new`
...
17 |     pub fn new(fetcher: F) -> Self {
   |            --- required by a bound in this associated function

error: could not compile `playground` (bin "playground") due to 1 previous error

Standard Output

If I don't mutate variable then the error disappear.
My questions:

  1. Is there a better way to construct Stream that fetch data using async library beside splitting generic from AsyncFnMut() -> O into two generic F and Fut in order for me to keep future return by such library ?
  2. Is there a way to force async closure to be FnMut ? It seem like the error message is telling me that this closure is not FnMut or AsyncFnMut. Did I misunderstood the compile error message ?

My experimental playground:

Note that, I also tried making a function that return the an implementation of Stream.
Here's the playground that I did: Rust Playground

The snippet I of that function look like this:

fn make_stream() -> impl Stream<Item = String> {
    let some_url = "host";
    StreamObj::new(async move || {
        let full_url = format!("{}/{}", some_url, "path");
        // simulate result
        full_url
    })
}

It also give me the same error:

   Compiling playground v0.0.1 (/playground)
error: async closure does not implement `FnMut` because it captures state from its environment
  --> src/main.rs:49:20
   |
49 |     StreamObj::new(async move || {
   |     -------------- ^^^^^^^^^^^^^
   |     |
   |     required by a bound introduced by this call
   |
note: required by a bound in `StreamObj::<F, FUT, O>::new`
  --> src/main.rs:14:8
   |
14 |     F: FnMut() -> FUT,
   |        ^^^^^^^^^^^^^^ required by this bound in `StreamObj::<F, FUT, O>::new`
...
17 |     pub fn new(fetcher: F) -> Self {
   |            --- required by a bound in this associated function

error: could not compile `playground` (bin "playground") due to 1 previous error

These are two different things.

  • FnMut() -> Fut is expecting a callable returning a Future.
  • AsyncFnMut() is expecting an async closure.

With async move || you get async closure, with move || async move you get closure returning a Future.

1 Like

Is this mean the need of two generic to do fetch for stream work a way to go ?

Your explanation to use move || async move {} did solve both of my playground issues but further exploration with second playground lead me to another error if I borrow mut variable.
My new make_stream fn look like

fn make_stream(counter: & mut usize) -> impl Stream<Item = String> {
    let some_url = "host";
    StreamObj::new(move || async move {
        let full_url = format!("{}/{}", some_url, "path");
        *counter += 1;
        // simulate result
        full_url
    })
}

It yield this error

   Compiling playground v0.0.1 (/playground)
error[E0507]: cannot move out of `counter`, a captured variable in an `FnMut` closure
  --> src/main.rs:49:28
   |
47 | fn make_stream(counter: & mut usize) -> impl Stream<Item = String> {
   |                ------- captured outer variable
48 |     let some_url = "host";
49 |     StreamObj::new(move || async move {
   |                    ------- ^^^^^^^^^^ `counter` is moved here
   |                    |
   |                    captured by this `FnMut` closure
50 |         let full_url = format!("{}/{}", some_url, "path");
51 |         *counter += 1;
   |         --------
   |         |
   |         variable moved due to use in coroutine
   |         move occurs because `counter` has type `&mut usize`, which does not implement the `Copy` trait

For more information about this error, try `rustc --explain E0507`.

My playground link: Rust Playground

Is my understanding of that error correct ?

fn make_stream(counter: & usize) -> impl Stream<Item = String> {
    let some_url = "host".to_owned();
    StreamObj::new(move || async move {
        let full_url = format!("{}/{}?counter={}", some_url.clone(), "path", *counter);
        // simulate result
        full_url   
    })
}

It is equivalence to

fn make_stream(counter: & usize) -> impl Stream<Item = String> {
    let some_url = "host".to_owned();
    StreamObj::new(move || {
        let temp = some_url;
        let ptr = counter;
        async move {
            let full_url = format!("{}/{}?counter={}", temp, "path", *ptr);
            // simulate result
            full_url   
        }
    })
}

If it is equivalence. I can make sense out of what actually causing an issue.
Notice that I change type of capture variable from &'static str into String. Since String is not implement Copy, even without borrowed mut variable, it ask me to clone value because it is moved.
Which mean that the reason move || async move {} work in the first place is by copy value twice. One is copy into move || {} closure then it copy again to async move {} statement.
When it is String, It doesn't implement copy. I need to clone it into temp variable first then async move {} capture the cloned value instead of the original String.
The working code is:

fn make_stream(counter: & usize) -> impl Stream<Item = String> {
    let some_url = "host".to_owned();
    StreamObj::new(move || {
        let temp = some_url.clone();
        let ptr = counter;
        async move {
            let full_url = format!("{}/{}?counter={}", temp, "path", *ptr);
            // simulate result
            full_url   
        }
    })
}

The problem with &mut usize is that it doesn't implement Copy nor Clone. If I turn &mut usize to &usize then the problem is gone because &T does implement Copy.

If my understanding is correct, even with async move || {}, it sill impossible to capture &mut T. Am I correct ?

Your understanding around closure returning future is correct. But async closure actually can capture &mut T.

The core difference is that, what happens when the closure is called twice (or more).

Consider f: FnMut() -> Fut case first, there's no lifetime attached to the Fut related to f, so you can write this:

let fut1 = f();
let fut2 = f();
fut1.await;
fut2.await;

Obviously fut1 or fut2 can't capture a &mut T, otherwise they would mutably borrowing the same T at the same time.

But with af: AsyncFnMut(), you can't write that! Compiler will give out an error like this:

   |
9  |     let f1 = af();
   |              -- first mutable borrow occurs here
10 |     let f2 = af();
   |              ^^ second mutable borrow occurs here
11 |     f1.await;
   |     -- first borrow later used here

This is because f1 and f2 (mutably) borrows af, so they can't co-exist, which allows af capturing a &mut T.

It's like the difference between Iterator and (persumed) LendingIterator. Calling FnMut() -> Fut is like calling fn call(&mut self) -> Fut on the closure, but calling AsyncFnMut() is like calling fn call<'a>(&'a mut self) -> Fut<'a>.

Hope this cleans thing up more.

1 Like

Thanks for explanation about example of fn call<'a>(&'a mut self) -> Fut<'a>.
This lead me back to original question of implementing Stream.
My current implementation make use of FnMut() -> Fut like code below.

struct StreamObj<F, FUT, O>
where
    F: FnMut() -> FUT,
    FUT: Future<Output = O>
{
    fetcher: F,
    fetching: Option<FUT>
}

This is because I need to keep Future in my Stream implementation to keep polling it to see if the Future yield any value. It lead me to issue of unable to borrow mut the environment.

Is there a way for me to store Future return from AsyncFnMut() -> T in my implementation ? I test by using F::CallRefFuture but it complain that this feature is not available in stable Rust along with require a lifetime parameter.

Let's say you want to support the lending call<'a>(&'a mut self) -> Fut<'a> capability. Consider what would happen here:

    // let's say that `self: Pin<&'m mut Self>`
    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // This is a `&'m mut Self` (or smaller lifetime)
        let mut_self = unsafe {Pin::get_unchecked_mut(self)};
        // ...
            // This is an `Option<Fut<'m>>` (or smaller lifetime)
            mut_self.fetching = Some((mut_self.fetcher)());
            ctx.waker().wake_by_ref();
            Poll::Pending
    }

The signature of poll_next requires that the method work for all 'm such that Self: 'm. That is, for arbitrarily short lifetimes 'm, so long as they're valid for the duration of the call. In other words: you can't use or store the result of (mut_self.fetcher)() longer than the method body of poll_next; that's the maximum duration you can assume the future is valid.[1]

So this approach of storing the future to try again later, after some fresh call to the method, isn't compatible with the lending capability.


That's more or less right. Which means, if you can use shared (interior) mutability, you may not need the lending capability. Consider this modification of your OP playground:

-    let mut counter = 0usize;
+    let counter = std::cell::Cell::new(0usize);
     let mut stream = pin!(StreamObj::new(async || {
         let full_url = format!("{}/{}", some_url, "path");
-        counter += 1;
+        counter.set(counter.get() + 1);

Why does it work? The async closure doesn't need the lending capability for the reason you pointed out: the future it returns can contain a copy of the captured shared reference &'x Cell<_> with the same lifetime 'x -- because shared references implement Copy.

That is, you have something like a call<'a>(&'a mut TheClosure<'x>) -> Fut, where returned future contains some copy of &'x Cell<_>, but has the same type for every call (independent of 'a).

And in contrast you can only reborrow through a &'a mut &'x mut usize for 'a,[2] resulting in a lending signature call<'a>(&'a mut TheClosure<'x>) -> Fut<'a>, where the type of the returned future is not independent of 'a; each call effectively returns a different type.


  1. There's also no way for 'm to be part of the definition of Self, like part of some type parameter Fut. ↩︎

  2. you can reborrow at most a &'a mut usize ↩︎

Thanks for elaboration of async || point.
If I understand your point correctly, call<'a>(&'a mut TheClosure<'x>) -> Fut always return the same Fut because regardless of any lifetime, Fut will still be Fut as it will always contains identical reference to Cell.
While call<'a>(&'a mut TheClosure<'x>) -> Fut<'a> will produce difference Fut at each call site because that &'a mut will be a different one everytime because &'a mut is not copy nor clonable which mean it is completely different from previous borrow mut in prior call.

Am I understand this point correctly ?

I think you have the idea. In terms of closures:

let a = async || { counter += 1; full_url };
let b = async || { counter.set(counter.get() + 1); full_url };

The reason why a ends up with an implementation that looks like call<'a>(&'a mut TheClosure<'x>) -> Fut<'a> is because &mut &mut i32 is not copy or clonable, and you can only reborrow counter for the outer lifetime 'a in the implementation. We'd say this is a lending function signature.

Whereas b can have an implementation that looks like call<'a>(&'a mut TheClosure<'x>) -> Fut because it can just copy the captured &'a Cell<i32> when returning the future. The Fut doesn't have to capture the outer lifetime 'a. We'd say it's a non-lending function signature.

So for this example, the semantics of &mut &mut _ reborrows is the reason for the different effective signatures.

More generally, which kind of effective signature you get -- lending or not -- is inferred from the async closure body; there are surely other examples that end up having to be lending for other reasons.

1 Like

Let me recap everything so far to answer my original questions.

Is there a better way to construct Stream that fetch data using async library beside splitting generic from AsyncFnMut() -> O into two generic F and Fut in order for me to keep future return by such library ?

O will always change if the async closure contains ref mut. ref mut required a new one every time it is called because Future being construct contains a new ref mut.
Unlike ref due to reason that ref itself can be copy into each Future return from async closures. It will be of the exact same type by copying the same ref.

The return type and lifetime of AsyncFn and AsyncFnMut is also available only via nightly Rust.

Is there a way to force async closure to be FnMut ? It seem like the error message is telling me that this closure is not FnMut or AsyncFnMut. Did I misunderstood the compile error message ?

It is of different type.
Quoting:

These are two different things.

  • FnMut() -> Fut is expecting a callable returning a Future.
  • AsyncFnMut() is expecting an async closure.

With async move || you get async closure, with move || async move you get closure > returning a Future.

There's some compiler magic going on for the case of non-mutable borrowing.
For example, this work:

    let some_url = "host".to_owned();
    let counter = 0usize;
    let mut stream = pin!(StreamObj::new(async || {
        let full_url = format!("{}/{}", some_url, counter);
        // simulate result
        full_url
    }));

Because compiler know exactly how to borrow their surrounding variable.
However, changing from async || {} to async move || {} won't work.
Compiler will return async closure does not implement FnMut because it captures state from its environment.
This is because FnMut supposed to be callable multiple time. In this particular case, some_url is not Copy so it can't be called multiple time as it will be moved into Future in the first call so compiler can't do it magic.

Is my understanding correct ?

Out of my own curiosity,
If AsyncFnMut() -> O always return different Future if it is mutably borrow something, why my experiment with this one work ? and is it ok to use unsafe this way ?

struct StreamObj<'a, F, O>
where
    F: 'a + AsyncFnMut() -> O
{
    fetcher: F,
    fetching: Option<F::CallRefFuture<'a>>
}
impl<'a, F, O> StreamObj<'a, F, O>
where
    F: 'a + AsyncFnMut() -> O
{
    pub fn new(fetcher: F) -> Self {
        Self {
            fetcher,
            fetching: None
        }
    }
}
impl<'a, F, O> Stream for StreamObj<'a, F, O>
where
    F: 'a + AsyncFnMut() -> O
{
    type Item = O;
    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        unsafe {
            let mut_self = Pin::get_unchecked_mut(self);
            let raw_self = &raw mut *mut_self;
            if let Some(fut_state) = mut_self.fetching.as_mut() {
                match Pin::new_unchecked(fut_state).poll(ctx) {
                    Poll::Ready(o) => {
                        mut_self.fetching = None;
                        Poll::Ready(Some(o))
                    },
                    Poll::Pending => Poll::Pending
                }
            } else {
                mut_self.fetching = Some(((*raw_self).fetcher)());
                ctx.waker().wake_by_ref();
                Poll::Pending
            }
        }
    }
}
fn make_closure(some_url: &String, counter: &mut usize) -> impl AsyncFnMut() -> String {
    async || {
        let full_url = format!("{}/{}", *some_url, "path");
        *counter += 1;
        full_url
    }
}
#[tokio::main]
async fn main() {
    let some_url = "host".to_owned();
    let mut counter = 0usize;
    {
        let mut stream = pin!(StreamObj::new(make_closure(&some_url, &mut counter)));
        println!("{}", stream.next().await.unwrap());
    }
    println!("{}", counter);
}

Note that it need to drop stream before last println, otherwise, compiler complain that counter is currently borrowed mut by stream.

Playground link: Rust Playground

If the future it returns does, yes.

It's an implementation of a different trait, not a different type. If the closure doesn't need to be lending -- if the future it returns doesn't need to borrow from the closure itself -- then the closure can implement both AsyncFnMut and FnMut.

Because you effectively transmuted a lifetime to extend an exclusive borrow that was supposed end at the end of the method. And no, it's generally not sound.

I'm trying to understand your point about soundness.
I wrote some code in this Rust Playground to my understanding about how async || {} work.

The key important thing that do lifetime check you mentioned seem to come from this:

impl<'a> AsyncClosure for MyAsyncClosure<'a> {
    type Fut<'x> = FutureObj<&'x mut String> where Self: 'x;
    fn call(&mut self) -> Self::Fut<'_> {
        FutureObj {
            context: &mut *self.borrow_mut
        }
    }
}

If I omit <'x> GAT, compiler immediately complain at the time I construct FutureObj.
With GAT, it move compiler complain to self.f = Some((self.c).call()); because f is declared to have lifetime 'a which seem to be a wrong lifetime for it.
If my understanding is correct, Is there any correct way to specify correct lifetime for it ?

To store in a struct? You can't, because the lifetime only exists in the body of the method, and your struct must be defined and exist outside of that. There may be some some sound way to do what you're attempting with a very solid understanding of the Pin requirements, and what your async executer guarantees, but even then I'd avoid lifetimes and go with pointers.

(I can't really help you with that, I'm afraid. I'm not sure if there is a sound way or not, in case that wasn't clear. You'd be in the land of using Pin and unsafe to construct something self-referencial but sound; I'd look for a dedicated guide.)

Thanks for your answers.
It seem that I need to find a workaround beside completely rely on closure for fetching data for stream.