Hey, in the process of trying to address issues mentioned in the code review of my project, I have run into a road block trying to remove the use of unsafe safe code and removing Send from restriction from Y for the function below.
I have read similar post in this forum and most of those end up suggesting using Crossbeam's or Rayon's scoped threads and/or channels to solve the problem. Regrettably, this is not something I can do in the scope of my project.
pub fn for_each<'env, T, Y>(&mut self, cb: T, slice: &'env mut [Y]) -> Result<(), Error>
where
T: Fn(&mut [Y], usize, usize) + 'env + Send + Sync,
Y: Send, // <- Trying to remove this
{
let mut parent_job = self.create_empty()?
let divisor = self.threads.len();
let group_size = (slice.len() / divisor).max(divisor);
let mut offset = 0_usize;
let mut remaining = slice.len();
while remaining != 0 {
let range = group_size.min(remaining);
let slice_ptr = unsafe { slice.as_mut_ptr().add(offset) }; // <- trying to remove this
let work_slice = unsafe { std::slice::from_raw_parts_mut(slice_ptr, range) }; // <- trying to remove this
let callback = &cb;
let child_job = self.create_with_parent(&mut parent_job, move || {
callback(work_slice, offset, offset + work_slice.len());
})?;
self.run(&child_job)?;
remaining -= range;
offset += range;
}
self.run(&parent_job)?;
self.wait(&parent_job);
Ok(())
}
It was suggested I use std::slice::split_at_mut()
and I also found a couple of posts suggesting the use of std::slice::chuncks()
method. Base on that I now have teh following code:
pub fn for_each<'env, T, Y>(&mut self, cb: T, slice: &'env mut [Y]) -> Result<(), Error>
where
T: Fn(&mut [Y], usize, usize) + 'env + Send + Sync, Y:Send,
{
let parent_job = self.create_noop()?;
const DEFAULT_GROUP_SIZE: usize = 64;
let divisor = self.threads.len();
let group_size = (slice.len() / divisor).max(DEFAULT_GROUP_SIZE);
let mut offset = 0_usize;
for work_slice in slice.chunks_mut(group_size) {
let callback = &cb;
let child_job = self.create_with_parent(&parent_job, move || {
callback(work_slice, offset, offset + work_slice.len());
})?;
self.run(&child_job)?;
}
self.run(&parent_job)?;
self.wait(&parent_job)?;
Ok(())
}
And I'm currently stuck on this compiler error with rustc 1.50.0:
error[E0277]: `Y` cannot be sent between threads safely
--> src/lib.rs:318:34
|
318 | let child_job = self.create_with_parent(&parent_job, move || {
| ^^^^^^^^^^^^^^^^^^ `Y` cannot be sent between threads safely
|
= note: required because it appears within the type `[Y]`
= note: required because of the requirements on the impl of `Send` for `&mut [Y]`
= note: required because it appears within the type `[closure@src/lib.rs:318:66: 320:14]`
help: consider further restricting type parameter `Y`
|
308 | T: FnMut(&mut [Y], usize, usize) + 'env + Send + Sync, Y: Send
|
If I add the restriction that Y:Sync + Send
, this code happily compiles. That being said, I can guarantee that the input slice is split safely among the worker threads and that no one else during the scope of that function will be modifying it.
I'm not sure what else I can do to make the compiler understand that Y can safely be accessed form multiple threads without resorting to unsafe.
The function above is meant to be used in contexts such as:
let mut js = jobsys::JobSystem::new(4, 512).unwrap();
let mut array = [0_u32; 100];
js.for_each(|slice: &mut [u32], start, _end| {
for i in 0..slice.len() {
slice[i] = (start + i) as u32;
}
},
&mut array).expect("Failed to start jobs");
Does anyone have any suggestions? Am I missing something very obvious?