Sharing slice chuncks accross threads: compiler requires slice type to satisfy Send + Sync

Hey, in the process of trying to address issues mentioned in the code review of my project, I have run into a road block trying to remove the use of unsafe safe code and removing Send from restriction from Y for the function below.

I have read similar post in this forum and most of those end up suggesting using Crossbeam's or Rayon's scoped threads and/or channels to solve the problem. Regrettably, this is not something I can do in the scope of my project.

pub fn for_each<'env, T, Y>(&mut self, cb: T, slice: &'env mut [Y]) -> Result<(), Error>
    where
        T: Fn(&mut [Y], usize, usize) + 'env + Send + Sync,
        Y: Send, // <- Trying to remove this
    {
        let mut parent_job = self.create_empty()?
        let divisor = self.threads.len();
        let group_size = (slice.len() / divisor).max(divisor);
        let mut offset = 0_usize;
        let mut remaining = slice.len();
        while remaining != 0 {
            let range = group_size.min(remaining);
            let slice_ptr = unsafe { slice.as_mut_ptr().add(offset) }; // <- trying to remove this
            let work_slice = unsafe { std::slice::from_raw_parts_mut(slice_ptr, range) }; // <- trying to remove this
            let callback = &cb;
            let child_job = self.create_with_parent(&mut parent_job, move || {
                callback(work_slice, offset, offset + work_slice.len());
            })?;
            self.run(&child_job)?;
            remaining -= range;
            offset += range;
        }
        self.run(&parent_job)?;
        self.wait(&parent_job);
        Ok(())
    }

It was suggested I use std::slice::split_at_mut() and I also found a couple of posts suggesting the use of std::slice::chuncks() method. Base on that I now have teh following code:

 pub fn for_each<'env, T, Y>(&mut self, cb: T, slice: &'env mut [Y]) -> Result<(), Error>
    where
        T: Fn(&mut [Y], usize, usize) + 'env + Send + Sync, Y:Send,
    {
        let parent_job = self.create_noop()?;
        const DEFAULT_GROUP_SIZE: usize = 64;
        let divisor = self.threads.len();
        let group_size = (slice.len() / divisor).max(DEFAULT_GROUP_SIZE);
        let mut offset = 0_usize;
        for work_slice in slice.chunks_mut(group_size) {
            let callback = &cb;
            let child_job = self.create_with_parent(&parent_job, move || {
                callback(work_slice, offset, offset + work_slice.len());
            })?;
            self.run(&child_job)?;
        }
        self.run(&parent_job)?;
        self.wait(&parent_job)?;
        Ok(())
    }

And I'm currently stuck on this compiler error with rustc 1.50.0:

error[E0277]: `Y` cannot be sent between threads safely
   --> src/lib.rs:318:34
    |
318 |             let child_job = self.create_with_parent(&parent_job, move || {
    |                                  ^^^^^^^^^^^^^^^^^^ `Y` cannot be sent between threads safely
    |
    = note: required because it appears within the type `[Y]`
    = note: required because of the requirements on the impl of `Send` for `&mut [Y]`
    = note: required because it appears within the type `[closure@src/lib.rs:318:66: 320:14]`
help: consider further restricting type parameter `Y`
    |
308 |         T: FnMut(&mut [Y], usize, usize) + 'env + Send + Sync, Y: Send
    |            

If I add the restriction that Y:Sync + Send, this code happily compiles. That being said, I can guarantee that the input slice is split safely among the worker threads and that no one else during the scope of that function will be modifying it.

I'm not sure what else I can do to make the compiler understand that Y can safely be accessed form multiple threads without resorting to unsafe.

The function above is meant to be used in contexts such as:

let mut js = jobsys::JobSystem::new(4, 512).unwrap();
let mut array = [0_u32; 100];
js.for_each(|slice: &mut [u32], start, _end| {
        for i in 0..slice.len() {
           slice[i] = (start + i) as u32;
         }
     },
    &mut array).expect("Failed to start jobs");

Does anyone have any suggestions? Am I missing something very obvious?

Are you saying it doesn't compile with Y: Send but it does compile with Y: Sync + Send? That would be weird because the error message you present complains about a missing Y: Send, AFAICT. Or do you want to get rid of the Send, too? But you do send some &mut [Y] to worker threads, so Y: Send is a requirement.

Exactly. If add where Y: Send I get the following error:

error[E0277]: `Y` cannot be shared between threads safely
   --> src/lib.rs:318:34
    |
318 |               let child_job = self.create_with_parent(&parent_job, move || {
    |  __________________________________^^^^^^^^^^^^^^^^^^______________-
    | |                                  |
    | |                                  `Y` cannot be shared between threads safely
319 | |                 callback(work_slice, offset, offset + work_slice.len());
320 | |             })?;
    | |_____________- within this `[closure@src/lib.rs:318:66: 320:14]`
    |
    = note: required because it appears within the type `[Y]`
    = note: required because it appears within the type `&mut [Y]`
    = note: required because it appears within the type `[closure@src/lib.rs:318:66: 320:14]`
help: consider further restricting this bound
    |
308 |         T: Fn(&mut [Y], usize, usize) + 'env + Send + Sync, Y: Send + Sync
    |        

What is the signature of create_with_parent?

pub fn create_with_parent<T, Y>(
        &mut self,
        parent: &JobHandle<Y>,
        job: T,
    ) -> Result<JobHandle<T>, Error>
    where
        T: Sized + FnOnce() + Send + Sync,

Please include the where bounds.

My bad, updated previous post

If you remove the T: Sync bound, does that let you remove the Y: Sync bound?

Yes it does :no_mouth:. Did I miss use the sync trait?

Well, you didn't need to send a &T across threads in create_with_parent, so you were more strict than you had to be.

1 Like

If you're saying that two slices don't overlap, the compiler already knows that.

If you're saying that no value of Y in the slice can make some unsynchronized access to the same data that another value in the slice is accessing (e.g. they aren't trying to clone two Rcs that point to the same data), then you have to either add the Y: Send bound or make the function unsafe and expect the caller to guarantee that no such access will be done (may allow more usecases, but it's unsafe for a reason!). If you don't do neither of them then someone could write something like this, which would eventually cause an use after free:

let mut js = jobsys::JobSystem::new(4, 512).unwrap();
let rc = std::rc::Rc::new(0);
let mut v = vec![Rc::clone(rc); 100];
js.for_each(|slice, _, _| {
    let rc = &slice[0];
    for _ in 0..10000 {
        rc.clone();
    }
}, &mut *v).unwrap();

Ah I see, thanks for help :slight_smile:

You're welcome. Generally a type is Sync if and only if an immutable reference to the type is Send.

1 Like