Channel: static lifetime error I get, but can't figure out

I'm getting a static lifetime error when trying to send/receive information using mpsc channel framework.

I thought I was careful in making sure that each thread owns "its share" and when completed, transfers that ownership.

Clearly, I'm missing something. I have read other posts, but can't see "it". Any ideas would be greatly appreciated.

    pub fn run(&mut self) -> &'a mut Inspector<'_> {

        let (tx, rx): (Sender<FieldCounters>, Receiver<FieldCounters>) = mpsc::channel();

        let mut workers = Vec::with_capacity(self.num_workers());

        // ⏰ usize, copied when moved
        // use single value to set the number of counters for each thread
        let num_fields = self.header.as_ref().unwrap().len();
        let num_records = self.in_memory_index.as_ref().unwrap().len();
        let job_size = utils::job_size(num_records as usize, self.num_workers());

        for id in 0..self.num_workers() {
            let thread_tx = tx.clone();

            // ✨
            let worker = thread::spawn(move || {

                // Instantiate what is mine and mine only!
                let mut field_counters = FieldCounters::new(num_fields);
                let mut reader = match reader_at_pos(
                    self.filepath.as_ref().unwrap(),
                    (id * job_size) as u64,
                    self.in_memory_index.as_ref().unwrap(),
                ) {
                    Err(e) => {
                        panic!("Reader failed for thread: {}", id)
                    }
                    Ok(reader) => reader,
                };

                // 🧮 count_fields_in_file
                // side-effect of updating the field_counters
               match count_fields_in_file(&mut field_counters, &mut reader, Some(job_size)) {
                    Err(e) => println!("Counter failed for thread: {}", id),
                    Ok(_) => println!(
                        "Counter success for thread: {}; last record pos: {}",
                        &id,
                        (job_size * id)
                    ),
                };

                thread_tx.send(field_counters).unwrap();
            });

            // collection of active workers (worker: JoinHandle)
            workers.push(worker);
        }

        println!(
            "Inspector counts starting at: {:?}",
            self.field_counters.as_ref().unwrap()
        );

        [0..self.num_workers()]
            .iter()
            .for_each(|_| match rx.recv() {
                Err(e) => println!("{}", e),
                Ok(counts) => self.field_counters.as_mut().unwrap().combine(&counts),
            });

        for worker in workers {
            worker.join().expect("oops! the worker thread panicked");
        }

        self
    }

And the error:

error[E0477]: the type `[closure@src/inspector.rs:140:40: 167:14]` does not fulfill the required lifetime
   --> src/inspector.rs:140:26
    |
140 |             let worker = thread::spawn(move || {
    |                          ^^^^^^^^^^^^^
    |
    = note: type must satisfy the static lifetime

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
   --> src/inspector.rs:140:40
    |
140 |               let worker = thread::spawn(move || {
    |  ________________________________________^
141 | |                 // Instantiate what is mine and mine only!
142 | |                 let mut field_counters = FieldCounters::new(num_fields);
143 | |                 let mut reader = match reader_at_pos(
...   |
166 | |                 thread_tx.send(field_counters).unwrap();
167 | |             });
    | |_____________^
    |
note: first, the lifetime cannot outlive the lifetime `'a` as defined on the impl at 36:6...
   --> src/inspector.rs:36:6
    |
36  | impl<'a> Inspector<'a> {
    |      ^^
note: ...so that the reference type `&mut Inspector<'a>` does not outlive the data it points at
   --> src/inspector.rs:140:26
    |
140 |             let worker = thread::spawn(move || {
    |                          ^^^^^^^^^^^^^
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@src/inspector.rs:140:40: 167:14]` will meet its required lifetime bounds
   --> src/inspector.rs:140:26
    |
140 |             let worker = thread::spawn(move || {
    |                          ^^^^^^^^^^^^^

Where and what is the error exactly? Could you post a minimal example on the playground?

1 Like

Thank you! I updated the post with the error message.

thread::spawn requires that the closure you give it is 'static. The reason copy pasted from the API docs:

The 'static constraint means that the closure and its return value must have a lifetime of the whole program execution. The reason for this is that threads can detach and outlive the lifetime they have been created in. Indeed if the thread, and by extension its return value, can outlive their caller, we need to make sure that they will be valid afterwards, and since we can't know when it will return we need to have them valid as long as possible, that is until the end of the program, hence the 'static lifetime.

This is a problem because 'static can outlive the lifetime 'a that you have.
The solution is to use crossbeam::scope. It creates a scope that you can spawn threads in and since the scope will make sure that all threads are joined before the scope() call ends, the closures don't need to be 'static.


Speaking of crossbeam, you should see the documentation in crossbeam::channel if you're interested:

Multi-producer multi-consumer channels for message passing.

This crate is an alternative to std::sync::mpsc with more features and better performance.

1 Like

I can read, and re-read that documentation and get it. At the same time, I can't imagine not being able to spawn a thread, here and there, throughout time, without each thread having to live as long as my app. Your answer regarding how to scope the threads makes perfect sense. When I say "here and there", I mean, in this and that scope I should be able to accomplish the task. Thank you...

The more I think about it, the more I think the docs should make it clear: "have a lifetime of the whole program execution"... maybe it should read "have a lifetime of the scope in which... [fill in the blank]".

I have the code up and running thanks to the input from everyone in this stream. Here is a summary "debrief" I hope is helpful not just for me as I consolidate what was learned.

Changed &mut to & and return ownership of the output

I was using a fn set_value(&mut self, input: xyz) -> &mut Self { /*...*/ } pattern of building my app. That pattern fails when I want to include a multi-thread approach to update a value hosted on Self.

// before 
fn fn set_value(&mut self, input: xyz) -> &mut Self { /*...*/ }

// after
pub fn run(&self) -> FieldCounters { /* do it */ }

Create a separate scope to "hide" the remaining borrow

My main "hosting" struct also had a &'a str value. This would get in the way of using a thread that could take-on any lifetime, "any" means (aka static) including a lifetime that might live longer than the <'a>. That took some time to rectify with my experience with how we all experience the use of threads. I should be able to have them come and go. The trick to accomplish what I intuit, and what the borrow checker needs, was using scoped threads. So within this new scope, the <'a> is for all practical purposes "out of sight" (of course not something I could accomplish if I continued using &mut self).

Set the bound size considering the number of clones made

I ended up using the crossbeam package. The only gotcha, was making sure that I model the idea of cloning the channel for each worker as "sharing" the channel (counter intuitive, for the purpose of cloning). However, the distinction ensures that I instantiate the channel with a "bound" large enough to accommodate the total number of workers. In my case, each worker was expected to send one message. The "gotcha" is setting the channel to a bound of one.

Debugging FYI here: Matching on the send Result with an informative print message helped me catch the fact that I was sending messages in a channel with insufficient "capacity" .

Use a separate scope to update self with the work product

I created an owned ref to host the combined output (messages) from each channel; possible with the treads now done/gone. I consumed that ownership to "finally" &mut self.

Thank you

Thank you to everyone who supports this wonderful Rust resource, and to everyone who provided the input that helped me get enough of a grip on the situation to accomplish the task.

Summary of the code itself

    pub fn run(&self) -> FieldCounters {
        // scope for the thread
        thread::scope(|s| {
            // use many times, including setting the channel.bound value
            let num_workers = self.num_workers();
           
            // what each thread needs to specify their share of the work
            let num_fields = self.header.as_ref().unwrap().len();
            // ...etc

            let (sender, receiver) = channel::bounded(num_workers);

            for (i, job_start) in job_starts.into_iter().enumerate() {
                // clone to share the channel; set bound accordingly
                let sender = sender.clone();

                // ✨
                s.spawn(move |_| {
                    // Instantiate what is mine and mine only!
                    let mut field_counters = FieldCounters::new(num_fields);
                    // ...etc

                    // 🧮 count_fields_in_file
                    // side-effect of updating the field_counters
                    match count_fields_in_file(&mut field_counters, &mut reader, Some(job_size)) {
                        Err(e) => println!("Counter failed for thread: {}\n{}", i, e),
                        Ok(_) => {
                            println!("-------------------------");
                            println!(
                                "👉 Counter success\nThread: {:02}\nRecords: {}-{}",
                                &i,
                                (job_size * i),
                                (job_size * i) + job_size,
                            );
                            println!("{}", &field_counters);
                            println!("Sample from field 3:\n{}", field_counters.get(3).unwrap(),);
                        }
                    };
                    /* Was useful for debugging
                    match sender.send(field_counters) {
                        Err(e) => println!("{}", e),
                        Ok(()) => println!("Send ok: {}", i),
                    }; */
                    sender.send(field_counters).unwrap();
                });
            }
            // combine the results
            let mut results = FieldCounters::new(num_fields);
            for _ in 0..num_workers {
                results.combine(receiver.recv().unwrap());
            }
            results
        })
        .unwrap()
    }

- E