Is There a Reason Not to Use a Crossbeam Scope in a Crossbeam Scope?

I just wanted to make sure that there was no reason not to do something like the following:

use crossbeam_utils::thread;

fn main() {
    
    let var = vec![1, 2, 3];
    thread::scope(|s| {
        
        s.spawn(|_| {
            let var2 = vec![3,2,1];
            
            thread::scope(|_| {
                println!("A child thread borrowing `var`: {:?}", var);
                println!("A child thread borrowing `var2`: {:?}", var2);
            })
        
        });
    }).unwrap();
}

(Playground)

Output:

A child thread borrowing `var`: [1, 2, 3]
A child thread borrowing `var2`: [3, 2, 1]

To clarify, it does work. The reason I am asking is because of the argument to s.spawn's closure is a scope object that can be used to spawn nested threads. But if I use the nested s.spawn() instead of creating another thread::scope I can't access var2:

use crossbeam_utils::thread;

fn main() {
    
    let var = vec![1, 2, 3];
    thread::scope(|s| {
        
        s.spawn(|s| {
            // Var2 can't be referenced inside of the closure below
            let var2 = vec![3,2,1];
            
            s.spawn(|_| {
                println!("A child thread borrowing `var`: {:?}", var);
                println!("A child thread borrowing `var2`: {:?}", var2);
            })
        
        });
    }).unwrap();
}

(playground)

I was just trying to figure out, if there is s.spawn() to spawn nested threads, is there a reason not to just create a new thread scope, like I seem to need to in this example?

That should be fine. Your thread that created the inner scope will be blocked waiting for completion of all its spawns, but that's the price for borrowing vec2.

2 Likes

Why can't you move var2 outside of the closure? You shouldn't need a nested scope in that case.

use crossbeam_utils::thread;

fn main() {
    
    let var = vec![1, 2, 3];
    let mut var2 = vec![3,2,1];
    
    thread::scope(|s| {
        
        s.spawn(|s| {
            // Var2 can't be referenced inside of the closure below
            let var2 = &mut var2;
            
            s.spawn(|_| {
                println!("A child thread borrowing `var`: {:?}", var);
                println!("A child thread borrowing `var2`: {:?}", var2);
            })
        
        });
    }).unwrap();
}

In my actual case I'm looping and spawning multiple nested threads, so I can't move var2 in because I need it afterwards.

And that second example didn't compile because the closure below var2's definition could "outlive the current function".

Edit: actually the error message was much more complicated than that:

error[E0495]: cannot infer an appropriate lifetime for autoref due to conflicting requirements
  --> src/main.rs:11:15
   |
11 |             s.spawn(|_| {
   |               ^^^^^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #2 defined on the body at 8:17...
  --> src/main.rs:8:17
   |
8  |           s.spawn(|s| {
   |  _________________^
9  | |             let var2 = vec![3,2,1];
10 | |             
11 | |             s.spawn(|_| {
...  |
15 | |         
16 | |         });
   | |_________^
note: ...so that reference does not outlive borrowed content
  --> src/main.rs:11:13
   |
11 |             s.spawn(|_| {
   |             ^
note: but, the lifetime must be valid for the method call at 8:9...
  --> src/main.rs:8:9
   |
8  | /         s.spawn(|s| {
9  | |             let var2 = vec![3,2,1];
10 | |             
11 | |             s.spawn(|_| {
...  |
15 | |         
16 | |         });
   | |__________^
note: ...so type `crossbeam_utils::thread::ScopedJoinHandle<'_, crossbeam_utils::thread::ScopedJoinHandle<'_, ()>>` of expression is valid during the expression
  --> src/main.rs:8:9
   |
8  | /         s.spawn(|s| {
9  | |             let var2 = vec![3,2,1];
10 | |             
11 | |             s.spawn(|_| {
...  |
15 | |         
16 | |         });
   | |__________^

Just tried it, that's because you are trying to pass a join guard outside of the second thread. Add a semicolon :slight_smile:. Also the &mut var2 causes some lifetime errors because Rust of the closure desugaring not coercing unique references, so I removed that too

use crossbeam_utils::thread;

fn main() {
    
    let var = vec![1, 2, 3];
    let mut var2 = vec![3,2,1];
    
    thread::scope(|s| {
        
        s.spawn(|s| {
            // no `let var2 = &mut var2;` here
            s.spawn(|s| {
                println!("A child thread borrowing `var`: {:?}", var);
                println!("A child thread borrowing `var2`: {:?}", var2);
            }); // semicolon here, to prevent the join guard from living too long
        
        });
    }).unwrap();
}

If you need to mutably access var2 after spawning the second thread, then yes, you will need a nested scope.

1 Like

But in that example, var2 is instantiated outside of the initial scope. In my use-case var2 is instantiated inside the first scope, though it doesn't need to be mutable:

use crossbeam_utils::thread;

fn main() {
    
    let var = vec![1, 2, 3];
    
    thread::scope(|s| {
        // var 2 is actually defined here
        let var2 = vec![3,2,1];
        s.spawn(|s| {
            s.spawn(|_| {
                println!("A child thread borrowing `var`: {:?}", var);
                println!("A child thread borrowing `var2`: {:?}", var2);
            });
        
        });
    }).unwrap();
}

That doesn't work.

You can do

use crossbeam_utils::thread;

fn main() {
    let var = vec![1, 2, 3];
    let mut var2 = Vec::new();
    
    thread::scope(|s| {
        s.spawn(|s| {
            var2 = vec![3, 2, 1];

            s.spawn(|s| {
                println!("A child thread borrowing `var`: {:?}", var);
                println!("A child thread borrowing `var2`: {:?}", var2);
            });
        
        });
    }).unwrap();
}

Or use Option if you have a more complex data structure that doesn't have a nice default.

I can't tell right off if that would work in my situation. Here's the actual code:

// Loop through cron jobs and run them if necessary
for (schedule, scripts) in &self.lucky_metadata.cron_jobs {
    let schedule: cron::Schedule = handle_err!(schedule.parse(), call);

    // If this job should be run
    if let Some(date) = schedule.after(&last_cron_tick).next() {
        if date < now {
            // Run the job in its own thread
            thread_scope(|s| {
                s.spawn(|_| {
                    // For every script in the job
                    for script in scripts {
                        let hook_name = "cron";

                        // helper to run the script
                        macro_rules! run_script {
                            () => {
                                if let Err(e) = tools::run_charm_script(
                                    &self,
                                    hook_name,
                                    &script,
                                    environment,
                                ) {
                                    job_sender
                                        .send(Err(e))
                                        .expect("Channel dropped prematurely");
                                    return Ok(());
                                }

                                // If docker is enabled, update container configuration
                                if self.lucky_metadata.use_docker {
                                    if let Err(e) = tools::apply_container_updates(self) {
                                        job_sender
                                            .send(Err(e))
                                            .expect("Channel dropped prematurely");
                                        return Ok(());
                                    }
                                }
                            };
                        }

                        // If the script is asynchronous
                        if script.is_async {
                            // Spawn it in another thread
                            thread_scope(|s| {
                                s.spawn(|_| {
                                    run_script!();
                                    Ok::<(), Void>(())
                                });
                            })
                            .expect("Panic in scoped thread");

                        // If the script is synchronous
                        } else {
                            // Run it in place
                            run_script!();
                        }
                    }

                    Ok::<(), Void>(())
                });
            })
            .expect("Panic in scoped thread");
        }
    }
}

I'm iterating over cron schedules, spawning each cron job in its own thread, and potentially spawning asynchronous scripts for each cron job.

Let me make sure I got this right,

  • script is free to be moved into the new thread, because it's thrown away after it's run
  • hook_name looks like a constant
  • environment is something from outside the first scope, that probably should not be moved into the sub-threads
  • job_sender is some channel, (hopefully crossbeam's mpmc channels)
    • if it is, it will be cheap to clone and send to each thread, but we shouldn't need to
  • self lives outside the first scope, and probably shouldn't be moved in

Is that all right?

That's all right.

Is self a &Self

Yep

Ok, in that case you could do something like

let environment = &environment;
let job_sender = &job_sender;

Right before the macro, and change the if to

// If the script is asynchronous
if script.is_async {
    // Spawn it in another thread
    s.spawn(move |_| {
        // move closure, this is why I asked to rebind
        // `environment` and `job_sender` so that
        // don't get moved into the sub-threads
        run_script!();
        Ok::<(), Void>(())
    });
    .expect("Panic in scoped thread");

// If the script is synchronous
} else {
    // Run it in place
    run_script!();
}
1 Like

Ah, OK, yeah. I did let job_sender = &job_sender and I happened to already be doing let environment = &environment. The final result ( I'll include the whole function this time for clarity ):

fn cron_tick(
    &self,
    call: &mut dyn rpc::Call_CronTick,
    juju_context_id: String,
) -> varlink::Result<()> {
    // Set the Juju context
    std::env::set_var("JUJU_CONTEXT_ID", &juju_context_id);

    log::trace!("Cron tick");

    // Create environment map
    let mut environment: HashMap<String, String> = HashMap::new();
    environment.insert("JUJU_CONTEXT_ID".into(), juju_context_id);
    // Make environment a reference ( so it can be used in threads )
    let environment = &environment;

    // Get the last cron tick time and the current time
    let mut last_cron_tick = self.last_cron_tick.lock().unwrap();
    let now = Local::now();

    // Create a channel used to transefer our job results from their threads
    let (job_sender, job_receiver) = unbounded_channel();

    // Loop through cron jobs and run them if necessary
    for (schedule, scripts) in &self.lucky_metadata.cron_jobs {
        let schedule: cron::Schedule = handle_err!(schedule.parse(), call);

        // If this job should be run
        if let Some(date) = schedule.after(&last_cron_tick).next() {
            if date < now {
                // Run the job in its own thread
                let job_sender = &job_sender;
                thread_scope(|s| {
                    s.spawn(|ss| {
                        // For every script in the job
                        for script in scripts {
                            let hook_name = "cron";

                            // helper to run the script
                            macro_rules! run_script {
                                () => {
                                    if let Err(e) = tools::run_charm_script(
                                        &self,
                                        hook_name,
                                        &script,
                                        environment,
                                    ) {
                                        job_sender
                                            .send(Err(e))
                                            .expect("Channel dropped prematurely");
                                        return Ok(());
                                    }

                                    // If docker is enabled, update container configuration
                                    if self.lucky_metadata.use_docker {
                                        if let Err(e) = tools::apply_container_updates(self) {
                                            job_sender
                                                .send(Err(e))
                                                .expect("Channel dropped prematurely");
                                            return Ok(());
                                        }
                                    }
                                };
                            }

                            // If the script is asynchronous
                            if script.is_async {
                                // Spawn it in another thread
                                ss.spawn(move |_| {
                                    run_script!();
                                    Ok::<(), Void>(())
                                });

                            // If the script is synchronous
                            } else {
                                // Run it in place
                                run_script!();
                            }
                        }

                        Ok::<(), Void>(())
                    });
                })
                .expect("Panic in scoped thread");
            }
        }
    }

    // Close the channel
    drop(job_sender);

    // Loop through job results
    for job_result in job_receiver.iter() {
        // Handle any errors
        handle_err!(job_result, call);
    }

    // Update the last cron tick
    *last_cron_tick = Local::now();

    // Unset the Juju context as it will be invalid when the cron tick command exits
    std::env::remove_var("JUJU_CONTEXT_ID");

    // Reply empty
    call.reply()
}

Thanks for the help @Yato !

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.