Capturing by value in `FnMut`

use tokio; // 1.16.1

use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::runtime::Runtime;

// Third-party code
fn call_fn_mut(mut f: impl FnMut(String) + Send + 'static) {
    f(String::from("hello"));
}

fn blocking_send(rt: Arc<Runtime>, tx: mpsc::Sender<String>, msg: String) {
    rt.block_on(tx.send(msg)).unwrap();
}

fn main() {
    let (tx, mut rx) = mpsc::channel(1);
    
    let rt = Arc::new(Runtime::new().unwrap());
    // let rt_cloned = rt.clone(); // Won't compile without this
    // call_fn_mut(move |msg| blocking_send(rt_cloned.clone(), tx.clone(), msg));
    call_fn_mut(move |msg| blocking_send(rt.clone(), tx.clone(), msg));
    
    rt.block_on(async {
        println!("{}", rx.recv().await.unwrap());
    });
}

(Playground)

Errors:

   Compiling playground v0.0.1 (/playground)
error[E0382]: borrow of moved value: `rt`
  --> src/main.rs:24:5
   |
19 |       let rt = Arc::new(Runtime::new().unwrap());
   |           -- move occurs because `rt` has type `Arc<Runtime>`, which does not implement the `Copy` trait
...
22 |       call_fn_mut(move |msg| blocking_send(rt.clone(), tx.clone(), msg));
   |                   ----------               -- variable moved due to use in closure
   |                   |
   |                   value moved into closure here
23 |       
24 | /     rt.block_on(async {
25 | |         println!("{}", rx.recv().await.unwrap());
26 | |     });
   | |______^ value borrowed here after move
   |
   = note: borrow occurs due to deref coercion to `Runtime`

For more information about this error, try `rustc --explain E0382`.
error: could not compile `playground` due to previous error

Here I expect rt to be .clone()ed first, then moved into the FnMut clousre, but it seems not.

Actually, I have to make a let rt_cloned = rt.clone() manually, and then pass it into the clousre via rt_cloned.clone(), just as the two statements where I have commented out.

I'm not sure whether it is designed to behave like that or it is a bug, could anyone help?

It's by design. By making the closure move, it moved all the captured variables into the closure; for any variables that aren't Copy, this means you don't own them any more and thus cannot use them. So for an Arc, you need to capture a clone instead. (Fortunately, that's a cheap operation.)

There is no "capture-by-cloning" closures, you have to arrange it yourself. Also, I'm not sure if this is where the confusion comes in or not, but note that closure bodies aren't executed until they are called, just like function bodies. That closure clones its captured Arc every time you call it, but not until you call it. (And you're not calling it here, you're passing it to someone else, who may call it later or not.)

Thanks for your answer!
By investigating the MIR, I realized that .clone()ed values in the colsures might behave like this:

struct Closure {
    rt: Arc<Runtime>,
    tx: mpsc::Sender<String>,
}
impl FnMut<(String,)> for Closure {
    fn call_mut(&mut self, (msg,) : (String,)) {
        blocking_send(self.rt.clone(), self.tx.clone(), msg);
    }
}
call_mut(Closure {
    rt: rt.clone(),
    tx: tx.clone(),
});

I changed the signature of blocking_send to be

fn blocking_send(rt: &Arc<Runtime>, tx: &mpsc::Sender<String>, msg: String) { /* ... */ }

and make the clousure

call_fn_mut(move |msg| blocking_send(&rt.clone(), &tx.clone(), msg));

I then expect the clousre to be desugared as:

struct Closure {
    rt: Arc<Runtime>,
    tx: mpsc::Sender<String>,
}
impl FnMut<(String,)> for Closure {
    fn call_mut(&mut self, (msg,) : (String,)) {
        blocking_send(self.rt, self.tx, msg); // No clone for `self.rt` and `self.tx`
    }
}
call_mut(Closure {
    rt: rt.clone(), // passed by clone
    tx: tx.clone(), // passed by clone
});

But it failed to compile, and the captured rt and tx are .clone()ed every time when the closure is called.
Playground for new code.

When you use move, you're still capturing the variables you mention, even if you put a & in front of them. If you want to capture references, you can

  • Not use move (just comment it out)
  • Use move but capture a variable which is a reference
    let rt_ref = &rt;
    let tx_ref = &tx;
    call_fn_mut(move |msg| blocking_send(rt_ref, tx_ref, msg));
    // Or
    call_fn_mut(move |msg| blocking_send(rt_ref.clone(), tx_ref.clone(), msg));
    

However, that's not going to work with call_fn_mut:

// Third-party code
fn call_fn_mut(mut f: impl FnMut(String) + Send + 'static) {

Because the + 'static bound means you can't call it with a closure that has captured any (non-'static) references. If you had captured references, the closure struct would conceptually look something like:

struct Closure<'a> {
    rt_ref: &'a Arc<Runtime>,
    tx_ref: &'a mpsc::Sender<String>,
}

And that lifetime 'a only lasts as long as your local variables, not 'static, aka forever. (But you didn't actually capture references in your code.)

As I try to illustrate above, what you capture will effect what the anonymous struct Closure looks like [1]. But it's not going to rewrite your code to remove the calls to .clone().

So body of the your function still looks something like

impl FnMut<(String,)> for Closure {
    fn call_mut(&mut self, (msg,) : (String,)) {
        blocking_send(&self.rt.clone(), &self.tx.clone(), msg);
    }
}

Because, well, that's what corresponds the body of the closure which you wrote. If the goal is to not clone within the capture, you'll need to not call .clone(). [2]


Here's a version that

  • Captures tx still
  • Captures a clone of the rc
    • So does not capture references, and is 'static
  • But passes references to blocking_send
    • So does not call clone inside the closure
    let rt = Arc::new(Runtime::new().unwrap());
    let rt_clone = rt.clone();
    call_fn_mut(move |msg| blocking_send(&rt_clone, &tx, msg));

The closure is still something like

struct Closure {
    rt_clone: Arc<Runtime>,
    tx: mpsc::Sender<String>,
}

Because that's what I captured (using move). But now the body is like

impl FnMut<(String,)> for Closure {
    fn call_mut(&mut self, (msg,) : (String,)) {
        blocking_send(&self.rt_clone, &self.tx, msg);
    }
}

corresponding to the body of the closure.


  1. if you capture references, the fields will be references, and the struct will have a lifetime that is limited by those references ↩︎

  2. Due to the lifetime bound discussed above, you'll still have to make a clone for the closure to capture, if you want to still use rt afterwards. ↩︎

4 Likes

That's exactly what I want, cool! Thank you very much!

Note that the tokio::sync::mpsc::Sender type has a function called blocking_send built-in.

Yes, I've noticed that. But I was trying to call blocking_send in an async {} block (for some reason, the blocking_send method shoud be non-async), and the tokio runtime panics, so I replaced tokio::mpsc with std::sync::mpsc later.

Since my mpsc channel is used in test codes, it is acceptable.

Using any kind of blocking send in async code is a real bad idea. Read this for why. Instead, you should .await the call to send.

2 Likes

I agree with you. This was because I used a single trait method express both sync and async ways, such as:

enum Mode {
    Sync,
    Async,
}
trait Foo {
    fn foo(&self, mode: Mode);
}
impl Foo for MyFoo {
    fn foo(&self, mode: Mode) {
        match mode {
            Mode::Sync => // do something synchronizedly.
            Mode::Async => // and here is where `blocking_send` called.
        }
    }
}

But later when I was writing tests, I realized it might be a bad idea to mix up the two things together, and I decided to separate them into a sync method and an async method. Unfortunately, async trait are not stable yet, so I have to use the async_trait crate.

#[async_trait::async_trait]
trait Foo {
    fn foo(&self);
    async fn foo_async(&self);
}
#[async_trait::async_trait]
impl Foo for MyFoo {
    fn foo(&self) {
        // do something synchronizedly.
    }
    async fn foo_async(&self) {
        // `.send().await` is used.
    }
}

After all, it will be OK to call foo or foo_async in both sync and async code.

Calling foo from async certainly wouldn't be ok if foo blocks.

What if withspawn_blocking?

Then it's fine, like the article I linked earlier explains.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.