Writing to same array across threads

Hi,

Something that I use in C/C++ is to have a number of threads writing out to the same array. They accomplish that by using a single atomic counter to bump a count of the number of items they want to write out.

I wonder if there is a way to do this in Rust in a safe manner? I could of course let the threads write our to different arrays and then merge them once all the threads are done but the would add extra overhead because of memcpy:ing of data on a single thread.

Cheers!

Perhaps the std::slice::split_at_mut(..) method could be useful for you.

1 Like

Thanks! I will take a look at it.

Here's something I threw together that might be what you want. Note that it does depend on an unstable feature in order to "unwrap" the Arc; you could get around this with an extra unsafe method that swapped the inner slice out.

Edit: I should also make clear that I don't really understand the implications of the various ordering modes, so I've gone for what I believe to be the most conservative.

Edit 2: Oh, and you should be able to extend this with a push_all or extend, by just bumping the offset by more than 1.

(Run on the playpen).

#![feature(arc_unique)]

use std::cell::UnsafeCell;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

pub struct AppendVec<T> {
    vec: UnsafeCell<Box<[T]>>,
    off: AtomicUsize,
}

unsafe impl<T> std::marker::Sync for AppendVec<T> {}

impl<T> AppendVec<T> where T: Clone {
    pub fn new(init: T, len: usize) -> Arc<Self> {
        // This is to just provide some assurance that we won't overflow the
        // offset counter.
        assert!(len < ::std::usize::MAX / 2);
        Arc::new(AppendVec {
            vec: UnsafeCell::new(vec![init; len].into_boxed_slice()),
            off: AtomicUsize::new(0),
        })
    }
}

impl<T> AppendVec<T> {
    pub fn push(&self, value: T) -> Result<(), T> {
        unsafe {
            let vec_len = (*self.vec.get()).len();
            let mut off = self.off.load(Ordering::SeqCst /* unsure */);

            while off < vec_len {
                let cur_off = self.off.compare_and_swap(off, off+1, Ordering::SeqCst /* unsure */);
                if cur_off != off {
                    // Someone else changed off; try again.
                    off = cur_off;
                } else {
                    *(*self.vec.get()).get_unchecked_mut(off) = value;
                    return Ok(());
                }
            }

            // Oh dear... out of room.
            Err(value)
        }
    }

    pub fn into_boxed_slice(self) -> (Box<[T]>, usize) {
        unsafe {
            (self.vec.into_inner(), self.off.load(Ordering::SeqCst /* unsure */))
        }
    }
}

fn main() {
    let vec = AppendVec::new(0, 64);
    let delays = &[2, 3, 5, 7, 11];
    let threads = delays.iter().cloned().enumerate()
        .map(|(i, d)| (i*100, d, vec.clone()))
        .map(|(off, delay, vec)| thread::spawn(move || {
            let mut off = off;
            loop {
                match vec.push(off+1) {
                    Ok(()) => {
                        off += 1;
                        thread::sleep_ms(delay);
                    },
                    Err(off) => {
                        println!("stopped at {}", off);
                        break;
                    }
                }
            }
        }))
        .collect::<Vec<_>>();
    
    for thread in threads {
        thread.join().unwrap();
    }
    
    match Arc::try_unwrap(vec).map(AppendVec::into_boxed_slice) {
        Ok((vec, off)) => {
            println!("Got {} results: {:?}", off, &vec[..off]);
        },
        Err(_) => {
            println!("Could not unwrap vec.");
        }
    }
}
2 Likes

Thanks! I really appreciate the help.

Yeah that shouldn't be too hard. Thanks again.

OT: just make sure to 64byte align (on most processors) your array chunks to avoid cache contention.

2 Likes

Yeah. I'm aware of that but thanks for pointing it out as it is important for sure.

1 Like

(Incidentally, the chunks_mut method, returning an iterator, is a nicer way to call split_at_mut in a loop to get views into the entirety of an array (if you're splitting with the same length each time).)

1 Like