E0495 unsatisfied lifetime constraints with channel and thread pool

Hello,

it seems so simple, but for me today it is not. I'm using rayon::ThreadPool and two channels for each thread to process data and get the results back. I'm also thinking to write this again with the new async stuff which is available now, do you think it would be nicer to handle this case? I'm open to suggestions.

I get the following error for the reduced test case.

error[E0495]: cannot infer an appropriate lifetime for lifetime parameter `'a` due to conflicting requirements
  --> src/main.rs:27:25
   |
27 |         self.thread_data.push(ThreadData {
   |                               ^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the lifetime 'a as defined on the impl at 23:6...
  --> src/main.rs:23:6
   |
23 | impl<'a> Test<'a> {
   |      ^^
   = note: ...so that the expression is assignable:
           expected ThreadData<'a>
              found ThreadData<'_>
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@src/main.rs:31:21: 37:4 receiver:std::sync::mpsc::Receiver<std::collections::HashMap<i64, (&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>, std::vec::Vec<&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>>)>>]` will meet its required lifetime bounds
  --> src/main.rs:31:15
   |
31 |         thread_pool.spawn(move || {
   |                     ^^^^^

I don't understand this part of the message: isn't 'a and '_ interchangeable here?
expected ThreadData<'a>
found ThreadData<'_>
= note: but, the lifetime must be valid for the static lifetime...

Mind that in my real application (that I'm trying to compile) the error is slightly different, but I think this is the same problem as seen here:

error: unsatisfied lifetime constraints
   --> src/test.rs:842:4
    |
329 |   impl<'a> Test<'a> {
    |        -- lifetime `'a` defined here
...
842 |               self.thread_data.push(ThreadData {is_alive,
    |  _____________^
843 | |                 computation_unit: ComputationUnit::from(barrier.elements.clone()),
844 | |                 running_since: now.as_millis(),
845 | |                 receiver,
846 | |                 sender: sender.clone()});
    | |_____________________________________________^ argument requires that `'a` must outlive `'static`

What I can do?

This is the reduced code. If someone could make it compile, I would be very happy. Thank you!

use std;

use std::collections::HashMap;
use std::sync::mpsc::channel;
use std::sync::mpsc::Sender;
use std::vec::Vec;

fn main() {}

pub type MachineId = i64;
pub type MachinePointer<'a> = &'a mut Box<dyn Machine + Send>;

pub trait Machine {}

pub struct Test<'a> {
	pub thread_data: Vec<ThreadData<'a>>,
}

pub struct ThreadData<'a> {
	sender: Sender<HashMap<MachineId, (MachinePointer<'a>, Vec<MachinePointer<'a>>)>>,
}

impl<'a> Test<'a> {
	fn _run(&mut self, thread_pool: &rayon::ThreadPool) {
		let (sender, receiver) = channel();

		self.thread_data.push(ThreadData {
			sender,
		});

		thread_pool.spawn(move || {
			loop {
				let mut _incoming: Vec<HashMap<MachineId, (&mut Box<dyn Machine + Send>, Vec<&mut Box<dyn Machine + Send>>)>> = receiver.try_iter().collect();
				//thread processes incoming data
				//and returns the results via another channel back (not included)
			}
		});

		//get results from thread if there are any
		//or exit
		//_run will be called again later to start new threads or handle results
	}
}

When the compiler says it wants something 'static it means any temporary references are not allowed.

thread_pool.spawn() is

pub fn spawn<OP>(&self, op: OP) where OP: FnOnce() + Send + 'static,

So that callback is required to be 'static, which means the closure can't borrow any data from its context at all. No amount of lifetime annotations can work around it — references are forbidden here.

You can share data with the closure using Arc. Otherwise data has to be fully moved to the closure and lose all contact with outside of the closure.

Alternatively, don't use the thread pool that has such drastic requirement. If you use rayon::scope, it will allow some references:

To add a bit more to what @kornel said, the compiler's error message is really saying that it expected the following:

impl Test<'static> 

but saw:

impl<'a> Test<'a>

i.e. it cannot reconcile the 'static requirement with the generic 'a. Of course, using impl Test<'static> doesn't really make sense for your case, and if you wrote it the code would compile but you wouldn't (in all likelihood) be able to use it because you won't be using 'static references.

Thanks @vitalyd and @kornel. Using scoped() it works perfectly.
But Kornel mentioned spawn() could work with Arc. Let's say I don't want to use scoped() because of the slight performance impact (stack vs heap allocation), how would one use an Arc together with spawn?
I imagined something along this

		let arc_receiver = Arc::new(receiver);
		thread_pool.spawn(move || {
			loop {
				let mut _incoming: Vec<HashMap<MachineId, (&mut Box<dyn Machine + Send>, Vec<&mut Box<dyn Machine + Send>>)>> = arc_receiver.try_iter().collect();
			}
		});

but then got this

error[E0277]: `std::sync::mpsc::Receiver<std::collections::HashMap<i64, (&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>, std::vec::Vec<&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>>)>>` cannot be shared between threads safely
  --> src/main.rs:34:15
   |
34 |         thread_pool.spawn(move || {
   |                     ^^^^^ `std::sync::mpsc::Receiver<std::collections::HashMap<i64, (&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>, std::vec::Vec<&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>>)>>` cannot be shared between threads safely
   |
   = help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Receiver<std::collections::HashMap<i64, (&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>, std::vec::Vec<&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>>)>>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::sync::mpsc::Receiver<std::collections::HashMap<i64, (&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>, std::vec::Vec<&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>>)>>>`
   = note: required because it appears within the type `[closure@src/main.rs:34:21: 38:4 arc_receiver:std::sync::Arc<std::sync::mpsc::Receiver<std::collections::HashMap<i64, (&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>, std::vec::Vec<&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>>)>>>]`

Who can save my day? :slight_smile:

Because Arc can be in two threads at once, its content can be accessed by two threads at the same time. It demands guarantee that doing so is safe (the Sync trait).

If two threads can't safely access content at the same time, there's another helper for it: Mutex that will limit access to one thread at a time. It's like an adapter that makes non-Sync things Sync.

So Arc<Mutex<Stuff>> should satisfy requirement of shared ownership and access from multiple threads.

Thanks kornel. Would you mean something simple like this:

		let my_receiver = Arc::new(Mutex::new(receiver));
		thread_pool.spawn(move || {
			let handle = my_receiver.lock().unwrap();
			loop {
				let mut _incoming: Vec<HashMap<MachineId, (&mut Box<dyn Machine + Send>, Vec<&mut Box<dyn Machine + Send>>)>> = handle.try_iter().collect();
			}
		});

because I then get the original error message again

error[E0495]: cannot infer an appropriate lifetime for lifetime parameter `'a` due to conflicting requirements
  --> src/main.rs:29:25
   |
29 |         self.thread_data.push(ThreadData {
   |                               ^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the lifetime 'a as defined on the impl at 25:6...
  --> src/main.rs:25:6
   |
25 | impl<'a> Test<'a> {
   |      ^^
   = note: ...so that the expression is assignable:
           expected ThreadData<'a>
              found ThreadData<'_>
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@src/main.rs:34:21: 39:4 my_receiver:std::sync::Arc<std::sync::Mutex<std::sync::mpsc::Receiver<std::collections::HashMap<i64, (&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>, std::vec::Vec<&mut std::boxed::Box<(dyn Machine + std::marker::Send + 'static)>>)>>>>]` will meet its required lifetime bounds
  --> src/main.rs:34:15
   |
34 |         thread_pool.spawn(move || {
   |                     ^^^^^

That <'a> attached to ThreadData means it contains a reference, and references are not allowed. No wrapper type can fix that. The only way is getting rid of a reference from the guts of ThreadData.

Looking at your code it looks like you have &'a mut Box. This type is useless for two reasons:

  • It's a reference (&mut) to a reference (Box), so you pay for double indirection (they both are references, the difference being one is a borrow, the other is owned).
  • The borrowed reference is exclusive, so it doesn't give you any more flexibility than owning the Box exclusively would. You only get less functionality due to restrictions on exclusive borrows.

Don't put temporary references inside types, because the Scarlet <'a> curses the whole types, and all types that contain them, to be usable only temporarily, too.

Just Box<dyn Foo> is fine.

Yes, this helped. I learned something again, but I think I'll keep the design for now and use scoped. The problem is much deeper... maybe you have another suggestion.
I'll explain. I have a HashMap with machines and I'm using unsafe multiple references to it's entries while making sure no double borrows occur.
Each machine does it's work in two phases: gather() and advance().
gather() tells the main loop which other machines it wants to interact with in the advance() phase and returns a Vec<MachineId>. So far everything is single-threaded.
The next phase is for every machine current_machine which has work to do current_machine.advance(other_machine_references). Before we can call advance on the current machine, we collect multiple other machine references (unsafe from HashMap, see below*) for each requested Vec<MachineId> and then finally supply advance with other_machine_references: Vec<&'a mut Box<dyn Machine + Send>>. Each execution of advance is moved to it's own thread (actually only if it makes sense and no multiple mutable borrows occur. I can make both conditions sure before creating and using the references). This way each machine can interact with any other machine and the main calculation can be executed multi-threaded.

*unsafe like

let pointer = object as *mut Box<dyn Machine + Send>;
out.push(unsafe { &mut *pointer });

So I'm already using unsafe multiple borrows from a HashMap. So far it seemed like a good idea. You would suggest to remove() the item (which returns the item) from the global machine HashMap, supply each advance(Vec<Box<...>>) with owned boxes (instead of pointers) and insert them again, when the thread has completed? I assumed this would be less performant -- doing remove/inserts just for each advance() phase of each machine, could be huge. The complexity for a HashMap in my case generally seems to be O(1) to O(n), which maybe isn't too bad. Still, I thought, using references would be faster.

Anyways, which route would you go? For me, performance is the key.

Apart from differences in ownership and representation fat pointers, &Box is like && or Box<Box<>>. That's not a type you would want to use, unless you have to send trait/closure pointers to C.

Keep in mind that Box (and Arc) can be cast to & at zero cost. So when you have Box<dyn Foo> you can reference it as &dyn Foo (not &Box<dyn Foo>).

You should consider using Arc<dyn Machine + Send> for the shared ownership. Compared to Box, overhead of Arc is zero when you use it, tiny when you clone() it. It's probably even faster than &Box due to having only single indirection cost.

Another fast solution is not to use any references. Instead refer to machines by an integer index which refers to some shared Vec or arena. That is convenient if you don't need to delete the objects (and if you add and remove them often, look for ECS pattern).

1 Like

Have you considered advance not actually modifying the other machines but instead sending back mutations over the channel back to your main thread, and then having the main thread apply them? You can keep the HashMap readonly via an Arc so the workers get cheap (and 'static) handles to it to inspect it. Once the workers are done and drop their Arc handles, your main thread can obtain a mutable borrow via Arc::get_mut.

1 Like

These are good suggestions, I'll be thinking about it. Thank you, I like them!

Could updating the HashMap values also be done in separate threads? For instance if thread_count = 4, then there could be 4 "HashMap slices" like with vectors whose values could be updated separately. There's SplitMut and multi_mut already doing something similar. Would this be possible and reasonable?

Since indexing is via a hash, how do you make disjoint mutable slices? What happens when hashes collide, or their collision-chain tails collide?

As @TomP points out, there’s no good way to “slice” a HashMap.

But, perhaps you can maintain 4 separate HashMaps (or however many threads you have), and then move each of these maps to a separate worker thread. The worker will have ownership, can mutate, and then send it back to your main thread.

You’ll have to somehow make sure that you partition your data over the HashMaps equally to avoid load imbalance, but perhaps this isn’t difficult in your case (eg each item in the map carries equal processing cost so you just split them by number).