Passing closure received as argument to thread

Dear all, I experimented with threads from the standard library and stumbled upon a problem that appeared pretty strange to me. Consider the following program:

use std::sync::{Arc, Mutex};
use std::thread;

fn make_closure() -> impl FnMut() -> () {
    let mut x = 0;
    move || {
        x += 1;
        println!("Function executed #{}", x);
    }
}

fn subfunc<F: FnMut() -> ()>(f: F) {
    // Adding the following line allows the program to compile and run:
    // let f = make_closure();
    let arc_parent = Arc::new(Mutex::new(f));
    let arc_child = arc_parent.clone();
    let t = thread::spawn(move || {
        let mut f = arc_child.lock().unwrap();
        f();
    });
    t.join().unwrap();
    let mut f = arc_parent.lock().unwrap();
    f();
}

fn main() {
    let f = make_closure();
    subfunc(f);
}

Note that subfunc consumes its argument f. The program fails to compile with the following error message:

   Compiling threadtest v0.1.0 (/usr/home/jbe/prj/rust/threadtest)
error[E0277]: `F` cannot be sent between threads safely
   --> src/main.rs:17:13
    |
17  |     let t = thread::spawn(move || {
    |             ^^^^^^^^^^^^^ `F` cannot be sent between threads safely
    |
    = note: required because of the requirements on the impl of `Send` for `Mutex<F>`
    = note: required because of the requirements on the impl of `Send` for `Arc<Mutex<F>>`
    = note: required because it appears within the type `[closure@src/main.rs:17:27: 20:6]`
help: consider further restricting this bound
    |
12  | fn subfunc<F: FnMut() -> () + Send>(f: F) {
    |                             ^^^^^^

error: aborting due to previous error

For more information about this error, try `rustc --explain E0277`.
error: could not compile `threadtest`

To learn more, run the command again with --verbose.

If I change the signature of subfunc as suggested by the compiler (i.e. add + Send to the traits for type F), then the error message is as follows:

   Compiling threadtest v0.1.0 (/usr/home/jbe/prj/rust/threadtest)
error[E0310]: the parameter type `F` may not live long enough
  --> src/main.rs:17:13
   |
12 | fn subfunc<F: FnMut() -> () + Send>(f: F) {
   |            -- help: consider adding an explicit lifetime bound...: `F: 'static +`
...
17 |     let t = thread::spawn(move || {
   |             ^^^^^^^^^^^^^ ...so that the type `[closure@src/main.rs:17:27: 20:6]` will meet its required lifetime bounds

error: aborting due to previous error

For more information about this error, try `rustc --explain E0310`.
error: could not compile `threadtest`

To learn more, run the command again with --verbose.

However, if I uncomment the line let f = make_closure();, i.e. if i do not use the passed closure but create the closure within the function, then the program compiles and generates the following (desired) output:

Function executed #1
Function executed #2

I'm stuck here. Why does the program only work when I use the closure created within the function subfunc?

If I create the closure in main and pass it by moving (i.e. transferring ownership to subfunc, as in my first example above), I'd expect the program to work also. Where is the difference in lifetime guarantees? Maybe the compiler fails to notice something?

Any help is appreciated. I use rustc version 1.48.0.

1 Like

This happens because:

  • impl Trait in return position leaks the Send/Sync implementation of the concrete type and in your case they are actually implemented.
  • impl Trait in return position implies a 'static bound. Edit: actually it gets tied to the lifetime of the generic parameters of the function, but in this case there are none so it becomes 'static

So when you uncomment that line f gains the additional bounds 'static + Send + Sync and they're exactly what the old f needed to be passed to thread::spawn.

4 Likes

To expand on what @SkiFire13 mentioned, you can fix this by changing the requirements for the generic type F in subfunc:

fn subfunc<F: 'static + FnMut() -> () + Send>(f: F) {
2 Likes

Thanks, I didn't realize that f returned by make_closure is 'static. I still haven't understood the concept of 'static very well yet, it seems.

Okay, thanks. That's also what the compiler suggested. However, I think a 'static lifetime might be too restrictive for my purposes. I'll have to experiment with that.

Many thanks for your responses, those helped a lot already!

Seeing that you're joining the thread before returning you might be interested in rayon::scope or crossbeam::scope.

1 Like

I do have some knowledge about when the thread is joined, and I actually experimented with crossbeam already, but didn't know about rayon yet. I might have a look at both again.

In my case, I wanted to store the scope in a struct and then execute methods on that struct. It seemed possible at first, but then I ran into a lot of complicated issues regarding lifetimes. My problem might be solvable through scopes, but it became quite complex and difficult to overview (for me, at least).

I might also like to figure out how crossbeam and rayon achieve the scopes. Understanding that might help me to handle those scopes better when storing them in structs. I assume unsafe Rust is needed for that.

Update:
I just experimented a bit with pointers and unsafe, and got a (likely ugly but working) solution:

use std::sync::{Arc, Mutex};
use std::thread;

struct Carry {
    ptr: usize,
}

unsafe impl Send for Carry {}

struct Data {
    counter: i32,
}

fn subfunc<F: FnMut() -> () + Send>(f: F) {
    let arc_parent = Arc::new(Mutex::new(f));
    let arc_child = arc_parent.clone();
    let carry = Carry {
        ptr: &arc_child as *const _ as usize,
    };
    let t = thread::spawn(move || {
        let arc_child = unsafe { &*(carry.ptr as *const Arc<Mutex<F>>) };
        let mut f = arc_child.lock().unwrap();
        f();
    });
    t.join().unwrap();
    let mut f = arc_parent.lock().unwrap();
    f();
}

fn main() {
    let mut data = Data { counter: 0 };
    let f = || {
        println!("Hello {}", data.counter);
        data.counter += 1;
    };
    subfunc(f);
    println!("Ending: {}", data.counter);
}

There might be easier ways to achieve this (converting the pointer to an integer seems not very straightforward, and having to wrap the pointer in a struct implementing Send doesn't either), plus I'm not even sure if my code is correct or causes any dangers. But I wanted to share it with you anyway (this was my first attempt at unsafe rust) :see_no_evil:

Feel free to correct me or show me an easier solution (with using just the standard lib, I mean). I really love Rust, just the threads kinda blow my mind… It isn't as easy for me as many sources suggest it is.

Update 2:
I simplified the code a bit to this:

use std::sync::Mutex;
use std::thread;

struct Data {
    counter: i32,
}

fn subfunc<F: FnMut() -> () + Send + Sync>(f: F) {
    let mut mutex = Mutex::new(f);
    let addr = &mut mutex as *mut Mutex<F> as usize;
    let t = thread::spawn(move || {
        let mutex = unsafe { &mut *(addr as *mut Mutex<F>) };
        let mut f = mutex.lock().unwrap();
        f();
    });
    t.join().unwrap();
    let mut f = mutex.lock().unwrap();
    f();
}

fn main() {
    let mut data = Data { counter: 0 };
    let f = || {
        println!("Hello {}", data.counter);
        data.counter += 1;
    };
    subfunc(f);
    println!("Ending: {}", data.counter);
}

I removed the Arc, as it's known when the mutex is to be dropped (at end of subfunc). I could also get rid of a separate structure marked as impl Send because I simply pass the pointer as an usize directly.

What I don't know if it's necessary to demand F to be Send + Sync or if Send would be sufficient (or if there are any other problems with my approach, like if the main thread panics and might clean up the mutex while it's still used by the child thread?).

Many thanks for referring me to rayon. I just read in the docs. It seems to handle the issue of creating a thread pool and assigning tasks to threads fully automatically. It may be exactly what I need for performing parallel calculations.

If you are encountering lifetime issues you don't understand, you definitely should not try to work around them using unsafe. Use unsafe when you do understand the lifetime issues and are certain the compiler is being overly conservative. In your case I don't see a way it can go wrong, but you're throwing out a lot of safeguards by casting the pointer to usize so I'm not sure.

Anyway, it's easy to do safely with scoped threads:

use std::sync::Mutex;
use crossbeam_utils::thread;

fn subfunc<F: FnMut() + Send>(f: F) {
    let mutex = Mutex::new(f);
    thread::scope(|scope| {
        scope.spawn(|_| {
            let mut f = mutex.lock().unwrap();
            f();
        });
    }).expect("A spawned thread panicked");
    let mut f = mutex.lock().unwrap();
    f();
}

You can even do it without the mutex if you move f into the thread and back out when you're done with it (which is one way of looking at what Mutex does):

use crossbeam_utils::thread;

fn subfunc<F: FnMut() + Send>(mut f: F) {
    let mut f = thread::scope(move |scope| {
        scope.spawn(move |_| {
            f();
            f
        }).join()
    })
    .and_then(|y| y) // flattens two layers of `Result`: the one returned from `join` and the one from `thread::scope`
    .expect("A spawned thread panicked");
    f();
}

You're right. :sweat_smile:

Anyway, it was a nice incentive to try experimenting with unsafe for the first time. Didn't plan to use that for any production code.

I did take a look at crossbeam before, but I want to be able to interrupt task execution for inspection of intermediate results by the caller. I tried to use closures for that, but things got pretty complex quickly, especially as I didn't want to spawn new threads for every calculation round.

For me, rayon (as mentioned by @SkiFire13 above) seems to be more suitable, because rayon will create a thread pool automatically. This allows me to spawn fine-graded because (if I understood it correctly) a spawn in rayon won't actually create a thread but just transparently pass a task to one thread in the already existent pool. This would reduce overhead if I spawn a lot of of times during my calculations.

See for_each of rayon's ParallelIterator for example.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.