Modifying disjoint subsets of a vector by two threads


#1

I learned Rust yesterday and begin to to think of problems that can be written nicely in C but which cause problems to Rust. For example consider a problem where a large vector is modified by a pair of threads, each thread accessing a different subset of elements. This subset could be the even indices in the first iteration, the 0,1 mod 4 in the second iteration and so on.

I have written programs to solve this example task in C and in (two different ways) Rust.

// gcc vectorsubset.c  -lpthread -O3
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

void operate(float *v, long n, long iteration, long selection, float x)
{
	long size=1<<iteration;
	for(long i=0;i<n;i++)
	{
		if((i/size)%2==selection)v[i]*=x;
	}
}

struct info
{
	float *v;
	long n;
	long iteration;
};

void *operate_child(void *data)
{
	struct info *info=data;
	operate(info->v,info->n,info->iteration,1,3.0);
}

int main()
{
	long k=23;
	long n=1<<k;
	float *v=malloc(n*sizeof(float));
	for(long i=0;i<n;i++)v[i]=1.0;
	for(int iteration=0;iteration<k;iteration++)
	{
		pthread_t child;
		struct info info={v,n,iteration};
		pthread_create(&child, NULL, operate_child, (void *)&info);
		operate(v,n,iteration,0,2.0);
		pthread_join(child,NULL);
	}
	float total=0.0;
	for(long i=0;i<n;i++)total+=v[i];
	printf("total=%f\n",total);
}
// rustc -O vectorsubset.rs
use std::{mem,thread,ptr};
fn separate<'a>(v: &'a mut Vec<f32>, f:&Fn(usize)->bool ) -> (Vec<&'a mut f32>,Vec<&'a mut f32>)
{
	let n=v.len();
	let mut r1=Vec::new();
	let mut r2=Vec::new();
	let mut w : &mut [f32] =v;
	for iteration in 0..n
	{
		let tmp=mem::replace(&mut w,&mut []);
		let (head,tail) = tmp.split_at_mut(1);
		w=tail;
		if f(iteration)
		{
			r1.push(&mut head[0]);
		}
		else
		{
			r2.push(&mut head[0]);
		}
	}
	(r1,r2)
}

fn operate(v : Vec<&mut f32>, x: f32)
{
	for elem in v.into_iter()
	{
		*elem *= x;
	}
}

static mut theVector : *mut Vec<f32>=0 as *mut Vec<f32>;

extern "C" {
    fn malloc(input: i64) -> *mut f32;
}

fn create_static_vector(n:usize)
{
	unsafe{
		theVector = malloc((n*4) as i64) as *mut Vec<f32>;
		ptr::write(theVector,vec![1.0;n]);
	}
}

fn get_static_vector() -> &'static mut Vec<f32>
{
	unsafe{
		&mut *theVector
	}
}

fn main()
{
	let k=23;
	let n=2u32.pow(k);
	create_static_vector(n as usize);
	for iteration in 0..k
	{
		let size=2u32.pow(iteration);
		let f = |i| (i/size as usize)%2==0;
		let (A,B) = separate(get_static_vector(),&f);
		let handle=thread::spawn(move||{
			operate(B,3.0);
		});
		operate(A,2.0);

		handle.join();
	}
	let total : f32 = get_static_vector().iter().sum();
	println!("total={}",total);
}
// rustc -O vectorsubset2.rs

use std::{mem,thread,ptr};

struct TrueIt<'a>
{
	v : &'a Vec<f32>,
	f : &'a (Fn(usize,u32)->bool + Send + Sync + 'static),
	curr : usize,
	data: u64,
	aux: u32,
}

struct FalseIt<'a>
{
	v : &'a Vec<f32>,
	f : &'a (Fn(usize,u32)->bool + Send + Sync + 'static),
	curr : usize,
	data: u64,
	aux: u32,
}

impl<'a> Iterator for TrueIt<'a>
{
	type Item = &'a mut f32;
	fn next(&mut self) -> Option<&'a mut f32>
	{
		while self.curr<self.v.len() && !(*self.f)(self.curr,self.aux)
		{
			self.curr+=1;
		}
		if self.curr<self.v.len()
		{
			let tmp=unsafe{
				&mut *((self.data +4*(self.curr as u64)) as *mut f32)
			};
			self.curr+=1;
			Some(tmp)
		}
		else
		{
			None
		}
	}
}

impl<'a> Iterator for FalseIt<'a>
{
	type Item = &'a mut f32;
	fn next(&mut self) -> Option<&'a mut f32>
	{
		while self.curr<self.v.len() && (*self.f)(self.curr,self.aux)
		{
			self.curr+=1;
		}
		if self.curr<self.v.len()
		{
			let tmp=unsafe{
				&mut *((self.data +4*(self.curr as u64)) as *mut f32)
			};
			self.curr+=1;
			Some(tmp)
		}
		else
		{
			None
		}
	}
}

fn separate<'a>(v:&'a mut Vec<f32>, f:&'a (Fn(usize,u32)->bool+Send+Sync+'static), aux:u32) -> (TrueIt<'a>, FalseIt<'a>)
{
	let data=&mut v[0] as *mut f32 as u64;
	let tr=TrueIt{
		v: v,
		f: f,
		curr: 0,
		data: data,
		aux: aux,
	};
	let fa=FalseIt{
		v: v,
		f: f,
		curr: 0,
		data: data,
		aux: aux,
	};
	(tr,fa)
}

static mut theVector : *mut Vec<f32>=0 as *mut Vec<f32>;

extern "C" {
    fn malloc(input: i64) -> *mut f32;
}

fn create_static_vector(n:usize)
{
	unsafe{
		theVector = malloc((n*4) as i64) as *mut Vec<f32>;
		ptr::write(theVector,vec![1.0;n]);
	}
}

fn get_static_vector() -> &'static mut Vec<f32>
{
	unsafe{
		&mut *theVector
	}
}

fn selection_function(i:usize, size:u32) -> bool
{
	(i/size as usize)%2==0
}

fn main()
{
	let k=23;
	let n=2u32.pow(k);
	create_static_vector(n as usize);
	for iteration in 0..k
	{
		let size=2u32.pow(iteration);
		let (A,B) = separate(get_static_vector(),&selection_function,size);
		let handle=thread::spawn(move||{
			for elem in B
			{
				*elem*=3.0;
			}
		});
		for elem in A
		{
			*elem*=2.0;
		}

		handle.join();
	}
	let total : f32 = get_static_vector().iter().sum();
	println!("total={}",total);
}

The C code is very simple. The first Rust program creates Vec<&'a mut f32>) to select where each thread works. I have needed to unsafely allocate a vector as static so that it has life to be passed to the thread; perhaps this can be solved in a safe way. The second Rust program creates some iterators thereby avoiding the creation of the vector of references. However, the creation of these iterators is similar to that of split_at_mut, and hence, it seems to be necessary to make some unsafe operations.

It is nice to see that compiling with optimizations Rust’s times are just a little above than C’s times.

I would like to know if this problem can be solved in a cleaner way.


#2

Can you please add code markup to your post so that the Rust code’s indentation renders correctly? You can do so using triple backticks:

```rust
// Your code goes here
```

Also, looking at your C code, I am not sure why you would want to use threads in this situation, and why you would want to use them in this fashion:

  • The computation is memory bound, so adding more threads is unlikely to help that much. On an array that is large enough for the computation time to compensate for the threading overhead, a well-vectorized version would already likely saturate your RAM’s bandwidth on a single core.
  • The alternating memory access pattern of your threads will cause false sharing, trashing your CPU caches and hurting your memory access performance as a result.
  • Letting the problem domain dictate your number of threads like this is generally a bad idea as it leads to oversubscription on single-core machines and resource starvation on many-core machines.

Personally, I would rather approach this problem by building a sequential version which operates on a slice of the original array with a certain offset, then handling the work of splitting the array as much as necessary and processing the resulting slices in parallel to rayon. Can you go in a bit more detail about what the motivation for writing the code like this was?


#3

I edited the post to have better formatting, as you suggested.

It is not a real problem. I am just trying to see what things are possible to do with Rust. Specifically, how to allow more than one thread to modify parts of the same vector which are known to be disjoint.

For the posed example you are probably right and there are more efficient programs.


#4

Actually, the sequential version takes double the time.

// gcc seq.c -O3 -o seq
#include <stdio.h>
#include <stdlib.h>

void operate(float *v, long n, long iteration, long selection, float x)
{
	long size=1<<iteration;
	for(long i=0;i<n;i++)
	{
		if((i/size)%2==selection)v[i]*=x;
	}
}

int main()
{
	long k=23;
	long n=1<<k;
	float *v=malloc(n*sizeof(float));
	for(long i=0;i<n;i++)v[i]=1.0;
	for(int iteration=0;iteration<k;iteration++)
	{
		operate(v,n,iteration,0,2.0);
		operate(v,n,iteration,1,3.0);
	}
	float total=0.0;
	for(long i=0;i<n;i++)total+=v[i];
	printf("total=%f\n",total);
}

A little benchmarking

time ./seq
total=11905515126784000.000000
4.03user 0.00system 0:04.04elapsed 100%CPU (0avgtext+0avgdata 34200maxresident)k
0inputs+0outputs (0major+8260minor)pagefaults 0swaps
time ./a.out
total=11905515126784000.000000
4.11user 0.00system 0:02.09elapsed 197%CPU (0avgtext+0avgdata 34372maxresident)k
0inputs+0outputs (0major+8271minor)pagefaults 0swaps
time ./vectorsubset
total=11905515000000000
2.85user 0.06system 0:02.68elapsed 108%CPU (0avgtext+0avgdata 178408maxresident)k
0inputs+0outputs (0major+44118minor)pagefaults 0swaps
time ./vectorsubset2
total=11905515000000000
4.11user 0.00system 0:02.20elapsed 186%CPU (0avgtext+0avgdata 35252maxresident)k
0inputs+0outputs (0major+8340minor)pagefaults 0swaps

So using threads improves performance. I have not analyzed the details, but the cache problem should only occur in the very first iterations.

But I do not want to lose perspective. The point of interest is if this can be done in Rust in a more elegant way. In efficiency we are already very close to C.


#5

The first time I read the topic I didn’t reply, would have been short about being a toy problem rather than practical in use.

Now later come to me what really nagging. You haven’t given a problem. You have given a solution and wanting it written the same way in different languages. This is a bad approach as it tries to force the languages to be the same. (Even data structures ideally should not be dictated by the problem.)


#6

Thanks for the edits, the code looks much better now! :slight_smile:

Rust’s borrow checker is not able to prove that two threads cannot access the same indices in a shared slice. So in general, splitting the work on a slice like this requires use of unsafe.

Usually, one goes about this by keeping the unsafe code minimal and encapsulating it inside of a safe abstraction, like your TrueIt and FalseIt. Common usage patterns, such as splitting the slice in fixed-size chunks or recursively breaking it in two parts, are natively supported by the standard library, so that normal applications do not need to use unsafe code. But under the hood, these abstractions are implemented using unsafe code.

Now, let us look at specific aspects of your implementation


You do not need the vector which threads operate on to be a global variable. You only need it to live long enough so that your threads never end up holding a dangling pointer to it, and you need to make sure that the compiler knows.

The simplest way to do so is to use a scoped thread API, as available in crossbeam for example. These APIs guarantee to the compiler that threads spawned with a certain scope will be joined before the associated scope is exited. As a result, they let you use non-static data in your thread: any data which is longer-lived than the scope in which the thread was spawned is okay.

In more complicated cases, you may need to put your vector in a reference-counted data block (Arc<T>) and hand over a reference to that data block to your thread. When you do this, you are making the threads responsible for freeing the data, so the borrow checker will also be happy: the data will not be freed before the thread accessing it completes its execution.

And in simpler cases, you may get away with just moving data into a thread’s scope at the beginning of said thread’s execution, and moving it back to the client at the end (e.g. by returning it as part of the thread’s result).


In your current implementation, creating a TrueIt or a FalseIt or accessing its members is unsafe, because a careless user could create two TrueIt pointing to the same data. Only the separate() function and the resulting iterators’ next() methods are safe. So in production use, you would want to put the definitions of TrueIt, FalseIt and separate() in a separate module, and manage visibility wisely so that users are only allowed to call separate() on a vector and to call next() on the resulting TrueIt and FalseIt.


The definitions of TrueIt, FalseIt, and separate() seem too complex considering what these components do. With unsafe code, one important concern is to keep things as simple as possible. Here are some ideas:

  • The code of your TrueIt and FalseIt are very similar. The two only differ by how they react to the output of f being true or false; You could probably factor a lot more commonalities out. Ideally, you would only have one iterator type, of which separate() creates two instance, with opposite selection functions.
  • You do not need to store the selection function “f” and its size parameter “aux” separately. With suitable modifications to the iterators and separate(), you could pack these together in a single closure object like |i| (i/size) % 2 == 0.
  • You can keep a pointer to the beginning of the Vec directly in the struct, instead of the casting dance that you currently perform. You just need to implement Send for the iterator afterwards, telling the compiler “okay, trust me, I have verified that this raw pointer can safely be passed between threads”.

Here’s how your code looks like once these changes are applied:

extern crate crossbeam;

use std::marker::PhantomData;

// Manipulating this struct is unsafe, so it should be put in a separate module
// to prevent the user from accessing the fields.
struct SelectIt<'a>
{
    // Ideally, we could just use a slice. But I could not think of a way of
    // doing so which wouldn't be UB in Rust due to mutable aliasing...
    vec_start: *mut f32,
    vec_len: usize,
    vec_lifetime: PhantomData<&'a f32>,

    // This is where the iterator is currently pointing
    cursor: usize,

    // This is the closure that determines which items we keep
    selector: &'a (Fn(usize) -> bool + Send + Sync),
    
    // We keep items when the closure returns this
    keep_value: bool,
}

// Iterating is safe if separate() has done its job correctly
impl<'a> Iterator for SelectIt<'a>
{
    type Item = &'a mut f32;
    fn next(&mut self) -> Option<&'a mut f32>
    {
        while self.cursor < self.vec_len
        {
            let old_cursor = self.cursor;
            self.cursor += 1;
            if (*self.selector)(old_cursor) == self.keep_value {
                return unsafe {
                    Some(&mut *self.vec_start.offset(old_cursor as isize))
                }
            }
        }
        None
    }
}

// We need this unsafe impl due to the raw pointer. It's okay, we have manually
// checked that races cannot result from using this pointer in other threads.
unsafe impl<'a> Send for SelectIt<'a> {}

// This layer of the abstraction is safe because it guarantees that the two
// selectors will not overlap.
fn separate<'a, F>(v:&'a mut Vec<f32>, selector: &'a F) -> 
(SelectIt<'a>, SelectIt<'a>)
    where F: Fn(usize) -> bool + Send + Sync + 'static
{
    let vec_start = v.as_mut_ptr();
    let vec_len = v.len();
    let cursor = 0;
    let tr = SelectIt {
        vec_start,
        vec_len,
        vec_lifetime: PhantomData,
        cursor,
        selector,
        keep_value: true,
    };
    let fa = SelectIt {
        vec_start,
        vec_len,
        vec_lifetime: PhantomData,
        cursor,
        selector,
        keep_value: false,
    };
    (tr,fa)
}


fn main()
{
    let k = 23;
    let n = 2usize.pow(k);
    let mut vec = vec![1.0; n];
    for iteration in 0..k
    {
        let size = 2usize.pow(iteration);
        let selector = move |i| (i/size) % 2 == 0;
        let (a, b) = separate(&mut vec, &selector);
        crossbeam::scope(|scope| {
            scope.spawn(move||{
                for elem in b
                {
                    *elem *= 3.0;
                }
            });
            for elem in a
            {
                *elem *= 2.0;
            }
        });
    }
    let total: f32 = vec.iter().sum();
    println!("total={}",total);
}

Note that there would be ways of simplifying SelectIt further so that it does not use dynamic dispatch and only contains a closure object (without the keep_value), But they all eventually boil down to returning an unboxed closure, which is complicated in current Rust. The stabilization of impl Trait should resolve this sometimes this year.


Parallel rust - Splitting up multiple matrices
#7

With chunking, split_at_mut() or just Rayon’s parallel iterators should be enough.

Within a single thread, for trivial types, you can avoid borrow checker by wrapping values in Cell.

But for multi-threaded arbitrary access there’s nothing apart from Atomic* types, because there’s no way to explain to the compiler that a complex usage pattern is safe.

For complex patterns, either avoid them and use another split, or create your own abstraction using unsafe that exposes a safe interface (such as a pair of iterators that don’t overlap).


#8

If you don’t want the full iterators for some reason, you can also split_at_mut() and then use rayon::join to process the two parts.


#9

Thank you people! Very constructive comments. For me it resulted natural to ask to separate a vector v by a set S into v[S] and v[complement(S)]. I suppose most applications do not need such generality and the developers have not seen necessary to include it.

@jonh Sorry if I appeared to be nagging. I had in mind the theoretical problem of these kinds of separations and forged a programming problem to try to evidence it. Surely it could have been done better.

I have to say that I am very happy with this language. I have programmed C and Coq among other languages and Rust has very nice similarities to both. It is safe and expressive as Coq and efficient (almost) as C.


#10

Nothing to apologise for. Was using different definition of nagging.
It’s hard to come up with the perfect example problem; Writing down any (within reason) is better than silence.