How to share an Arc<dyn Fn(u8)> between threads

I'm trying to design a simple struct that can run a thread of its own. My use case is a video decoder. I create the struct, set the on_produce function, and then it delivers video frames through this callback. I tried using Box<dyn Fn(u8)> because I thought that was why I could not share it between threads, but even moving to Arc didn't fix the error.

use std::sync::Arc;
use std::thread::{JoinHandle, sleep, spawn};
use std::time::Duration;

pub struct A {
    on_produce: Arc<dyn Fn(u8)>,
    run_thread: Option<JoinHandle<()>>
}

impl A {
    fn run(&self) {
        self.run_thread = Some(spawn(move || {
            loop {
                (self.on_produce)(0);
                sleep(Duration::from_millis(100));
            }
        }));
    }
}
impl Drop for A {
  fn drop(&mut self) {
    self.run_thread.unwrap().join();
  }
}

Error:

error[E0277]: `(dyn std::ops::Fn(u8) + 'static)` cannot be sent between threads safely
   --> src/lib.rs:12:32
    |
12  |         self.run_thread = Some(spawn(move || {
    |                                ^^^^^ `(dyn std::ops::Fn(u8) + 'static)` cannot be sent between threads safely
    |
    = help: the trait `std::marker::Send` is not implemented for `(dyn std::ops::Fn(u8) + 'static)`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::sync::Arc<(dyn std::ops::Fn(u8) + 'static)>`
    = note: required because it appears within the type `A`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&A`
    = note: required because it appears within the type `[closure@src/lib.rs:12:38: 17:10 self:&A]`

How can I share an Arc<dyn Fn(u8)>> between threads? I thought Arc was meant for that already.

Since not every impl Fn(u8) types are safe to be shared between threads, you need additional restrictions on your dyn Trait. Use Arc<dyn Fn(u8) + Send + Sync + 'static> instead.

In your error message, the compiler says that Send is not implemented for dyn Fn(u8) + 'static.

    = help: the trait `std::marker::Send` is not implemented for `(dyn std::ops::Fn(u8) + 'static)`

This is very much the case, not all functions are thread-safe, consider this function for instance.

use std::cell::Cell;
use std::sync::Arc;
let cell = Arc::new(Cell::new(0));
let cell_clone = Arc::clone(&cell);
let f = move || cell_clone.set(cell_clone.get() + 1);

This function cannot be sent to other threads because if it did it would allow for thread races when called from multiple threads at the same time. It's perfectly fine in a single-threaded program however.

dyn Fn(u8) represents a function, it says nothing about its thread-safety requirements. To tell the compiler that we need the function can change the thread it is in (we need that for thread::spawn), you will need to say it implements Send by writing dyn Fn(u8) + Send.

    on_produce: Arc<dyn Fn(u8) + Send>,

This causes an error.

error[E0277]: `(dyn std::ops::Fn(u8) + std::marker::Send + 'static)` cannot be shared between threads safely
   --> src/lib.rs:12:32
    |
12  |         self.run_thread = Some(spawn(move || loop {
    |                                ^^^^^ `(dyn std::ops::Fn(u8) + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `(dyn std::ops::Fn(u8) + std::marker::Send + 'static)`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::sync::Arc<(dyn std::ops::Fn(u8) + std::marker::Send + 'static)>`
    = note: required because it appears within the type `A`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&A`
    = note: required because it appears within the type `[closure@src/lib.rs:12:38: 15:10 self:&A]`

Send allowed an object to be sent to another thread to begin with, but that's all it did. To actually be able to call an object when shared between multiple threads we need the function to implement Sync trait which essentially says that an object can be shared by multiple threads at the same time.

    on_produce: Arc<dyn Fn(u8) + Send + Sync>,

Oh, but what's that? Another error.

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> src/lib.rs:12:38
   |
12 |           self.run_thread = Some(spawn(move || loop {
   |  ______________________________________^
13 | |             (self.on_produce)(0);
14 | |             sleep(Duration::from_millis(100));
15 | |         }));
   | |_________^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 11:5...
  --> src/lib.rs:11:5
   |
11 | /     fn run(&self) {
12 | |         self.run_thread = Some(spawn(move || loop {
13 | |             (self.on_produce)(0);
14 | |             sleep(Duration::from_millis(100));
15 | |         }));
16 | |     }
   | |_____^
note: ...so that the types are compatible
  --> src/lib.rs:12:38
   |
12 |           self.run_thread = Some(spawn(move || loop {
   |  ______________________________________^
13 | |             (self.on_produce)(0);
14 | |             sleep(Duration::from_millis(100));
15 | |         }));
   | |_________^
   = note: expected `&A`
              found `&A`
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@src/lib.rs:12:38: 15:10 self:&A]` will meet its required lifetime bounds
  --> src/lib.rs:12:32
   |
12 |         self.run_thread = Some(spawn(move || loop {
   |                                ^^^^^

This error is saying that std::thread::spawn requires 'static lifetime for the function. The function itself however borrows self, and self may not live as long as 'static.

That being said, we are borrowing Arc, so we can clone an Arc.

impl A {
    fn run(&self) {
        let on_produce = Arc::clone(&self.on_produce);
        self.run_thread = Some(spawn(move || loop {
            on_produce(0);
            sleep(Duration::from_millis(100));
        }));
    }
}

We don't depend on lifetime of &self anymore in a spawned thread, so the thread function itself is 'static. Everything is fine, right?

error[E0594]: cannot assign to `self.run_thread` which is behind a `&` reference
  --> src/lib.rs:13:9
   |
11 |     fn run(&self) {
   |            ----- help: consider changing this to be a mutable reference: `&mut self`
12 |         let on_produce = Arc::clone(&self.on_produce);
13 |         self.run_thread = Some(spawn(move || loop {
   |         ^^^^^^^^^^^^^^^ `self` is a `&` reference, so the data it refers to cannot be written

Well, this can be easily dealt with by using an exclusive reference (&mut self) like suggested by the compiler.

impl A {
    fn run(&mut self) {
        let on_produce = Arc::clone(&self.on_produce);
        self.run_thread = Some(spawn(move || loop {
            on_produce(0);
            sleep(Duration::from_millis(100));
        }));
    }
}

Then we can compile...

error[E0507]: cannot move out of `self.run_thread` which is behind a mutable reference
  --> src/lib.rs:22:9
   |
22 |         self.run_thread.unwrap().join();
   |         ^^^^^^^^^^^^^^^
   |         |
   |         move occurs because `self.run_thread` has type `std::option::Option<std::thread::JoinHandle<()>>`, which does not implement the `Copy` trait
   |         help: consider borrowing the `Option`'s content: `self.run_thread.as_ref()`

Option::unwrap signature looks like fn unwrap(self) -> T. It requires an ownership of an option Option<T>, but we have &mut Option<T> here. As join requires ownership of an inner value, we may have to replace the value of run_thread with something else to join a thread. Conveniently, Rust provides take method on Options that does exactly that - it takes &mut Option<T>, replaces the value behind the reference with None and yields Option<T>. Option<T> can be easily unwrapped.

impl Drop for A {
    fn drop(&mut self) {
        self.run_thread.take().unwrap().join();
    }
}
8 Likes

IIRC, you can call Option::as_mut to go from &mut Option<T> to Option<&mut T>. Then you can call methods requiring ownership of the Option.

EDIT: I just noticed you require T, not &mut T. Option::take, as you mentioned, is the way to go, then.

1 Like

wow, this answer is perfect. It solved all my problems :slight_smile: and I understood better how Rust works. Thank you very much!