Multithreaded access to Arc without Mutex

Hi all,

I am new to Rust, but am enjoying it so far. Thanks for the work put into the community.

I have a data-structure problem that I would like more experienced peoples' opinion on re: complex "atomic" data types.

Essentially, what I would like is a double-buffer type that writes via one thread, and reads via the other thread.

My first implementation looks something like this:

pub struct DoubleBufT<const PRE_N: usize, const POST_N: usize> {
    buff_: [SingleBufT<PRE_N, POST_N>; 2],
    write_active_buffer_idx_: usize,
    read_active_buffer_idx_: usize,
}

impl<const PRE_N: usize, const POST_N: usize> DoubleBufT<PRE_N, POST_N> {
    pub fn default() -> Self{
        Self {
            buff_: core::array::from_fn(|_| SingleBufT::<PRE_N, POST_N>::default()),
            write_active_buffer_idx_: 0,
            read_active_buffer_idx_: 1,
        }
    }

    pub fn switch_buffer(& mut self){
        self.write_active_buffer_idx_ = if self.write_active_buffer_idx_ == 0 {1} else {0};
        self.read_active_buffer_idx_ = if self.write_active_buffer_idx_ == 0 {1} else {0};
        self.buff_[self.write_active_buffer_idx_].set_full(false); // just switched buffers, it is empty now
    }

    pub fn write_to_buff(&mut self, all_angles_frame: Vec<Mat>){
        self.buff_[self.write_active_buffer_idx_].write_to_buff(all_angles_frame);
    }

    pub fn read_from_buff(&self, frame_nr: usize) -> (Vec<Mat>, usize, usize) {
        let mut frames: Vec<Mat> = vec![];
        let mut special_frame_nr: usize = 0;
        let mut num_frames_in_seq: usize = 0;
            frames = (self.buff_[self.read_active_buffer_idx_]).get_ref_to_frame(frame_nr).to_vec();
            special_frame_nr = self.buff_[self.read_active_buffer_idx_].get_current_pre_capacity();
            num_frames_in_seq = self.buff_[self.read_active_buffer_idx_].get_current_pre_capacity() + self.buff_[self.read_active_buffer_idx_].get_current_post_capacity();
        (frames, special_frame_nr, num_frames_in_seq)
    }

where access to the buffers is by creating Arc<Mutex<DoubleBufT>>

However, I have realised that this means that the write thread will block the read thread and vice-versa, as the lock has to be held when doing either reading or writing.

What I would prefer, is that a mutex is internally contained within the DoubleBuffer, and it is grabbed only when switching the buffer indices, or when reading.

I have attempted to modify the struct above to instead contain a mutex, and use it to lock when switching and reading only, however, when I create Arc instead of Arc<Mutex<...>>, understandably, the compiler tells me that DerefMut trait is not implemented for my DoubleBufT.

Other posts such as this one Trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<_>` - #2 by alice recommend that either a mutex (presumably external) is required, or the actor pattern is used. Is there no third option?

Thanks for reading!

If I understand your code correctly, the only data that needs locking is the buffer indices, and you want to be able to do both read-locking and write-locking on them.

First, a code improvement: write_active_buffer_idx_ can only be 0 or 1, and read_active_buffer_idx_ is always the value that the write index is not. Thus, you can replace the buffer indices with a bool that is true when write_active_buffer_idx_ == 0 && read_active_buffer_idx_ == 1 and false when write_active_buffer_idx_ == 1 && read_active_buffer_idx_ == 0 (or vice versa).

Then, simply wrap this bool in a RwLock and lock it as appropriate before operating on it.

EDIT: Ah, wait, you still can't wrap the resulting DoubleBufT in an Arc because switch_buffer() and write_to_buf() require mut access. Could you please show your attempt at using an inner mutex so I can see exactly what went wrong there?

Hi, thanks for taking the time to help!

My modifications would look something like this:

impl<const PRE_N: usize, const POST_N: usize> DoubleBufWithMutexT<PRE_N, POST_N> {
    pub fn default() -> Self{
        Self {
            buff_: core::array::from_fn(|_| SingleBufT::<PRE_N, POST_N>::default()), //wtf
            write_active_buffer_idx_: 0,
            read_active_buffer_idx_: 1,

            internal_buff_mutex_: Arc::new(Mutex::new(false)) //new!
        }
    }

Now, I would attempt to lock via this mutex when either switching the buffers, or when reading. The write thread is in control of switching the buffers, so no need to worry about locking write access when switching buffers...

So the read and switch_buffer functions would now look something like this:

pub fn switch_buffer(& mut self){
        self.write_active_buffer_idx_ = if self.write_active_buffer_idx_ == 0 {1} else {0};
        {
            let _lock = self.internal_buff_mutex_.lock().unwrap(); // new!
            self.read_active_buffer_idx_ = if self.write_active_buffer_idx_ == 0 {1} else {0};
        }
        self.buff_[self.write_active_buffer_idx_].set_full(false); // just switched buffers, it is empty now
    }

And read function would use the same lock to read. The idea is that switching buffers does not happen often at all, meaning that no threads ever block, because the mutex is held by the read thread most of the time

However, a simple test reveals the issue:

#[test]
    fn test_db_mutex() -> Result<(), Box<dyn std::error::Error>> {
        // mutex is contained internally in the double buffer, so we can just create a long lived double buffer without externally handling mutexes...
        const PRE_CAPACITY:usize = 20;
        const POST_CAPACITY: usize = 20;
        let db: Arc<DoubleBufWithMutexT<20, 20>> = Arc::new((DoubleBufWithMutexT::<PRE_CAPACITY,POST_CAPACITY>::default()));
        let db_clone_1 = Arc::clone(&db);
        let db_clone_2 = Arc::clone(&db);
        let write_thread = thread::spawn(move || {
            for i in 1..PRE_CAPACITY+1 {
                let mut mat_1 = Mat::zeros(3, 3, CV_8UC3).expect("unwrap").to_mat();
                let mut mat_2 = Mat::zeros(3, 3, CV_8UC3).expect("unwrap").to_mat();
                println!("writing {} to pre-buffer", i);
                mat_1 = mat_1.unwrap().set_to(&Scalar::all(i as f64), &Mat::default());
                mat_2 = mat_2.unwrap().set_to(&Scalar::all(i as f64), &Mat::default());
                let all_angles_frame = vec![mat_1.expect("unwrap").clone(), mat_2.expect("unwrap")];
                db_clone_1.write_to_buff(all_angles_frame); //ERROR: cannot borrow data in an `Arc` as mutable
            }
        });
        Ok(())
    }

So this is what leads me to believe that I either need to implement DerefMut, or I am trying to do something there is probably a better solution for...

If I understand correctly, something like Arc<RwLock<(bool, [RwLock<Buf>;2])>> would solve your access/synchronization pattern. You would take the outer write lock to switch buffers and outer read lock and appropriate inner lock to access a buffer.

This way buffer accesses would not block each other (unless they are write accesses on the same buffer) and switch would block everything else.

So applied to your code it would look like this:

pub struct DoubleBufT<const PRE_N: usize, const POST_N: usize> {
    // This changed
    buff_: [RwLock<SingleBufT<PRE_N, POST_N>>; 2],
    write_active_buffer_idx_: usize,
    read_active_buffer_idx_: usize,
}

impl<const PRE_N: usize, const POST_N: usize> DoubleBufT<PRE_N, POST_N> {
    pub fn switch_buffer(&mut self){
        // ...
    }

    pub fn write_to_buff(&self, all_angles_frame: Vec<Mat>){
        // Get an inner write lock on buffer
        self.buff_[self.write_active_buffer_idx_].write().unwrap().write_to_buff(all_angles_frame);
    }

    pub fn read_from_buff(&self, frame_nr: usize) -> (Vec<Mat>, usize, usize) {
        // ...
        // Get an inner read lock on buffer
        let guard = self.buff_[self.read_active_buffer_idx_].read().unwrap();
        // ...
    }
}

let db: Arc<RwLock<DoubleBufT<20, 20>>> = Arc::new(RwLock::new(DoubleBufT::<PRE_CAPACITY,POST_CAPACITY>::default()));

(complete example)

1 Like

Hi,

Thanks for your suggestion! It seems I completely missed RwLock... it seems purpose built for this!

I used your changes, and it fixed my DerefMut issue! Thanks

Is there any reason you use read.unwrap() instead of write.unwrap() when writing to the buffer? You seem to have emphasized the fact to be using read with underscores in your annotation...

I will read the RwLock reference and come back and edit if I realise why :slight_smile:

Yes, it looks counterintuitive, but yes, there is.

Note that I do this on the outer RwLock, i.e. db.read().unwrap().write_to_buff(...). Inside the write_to_buff method there is still a write().unwrap(), i.e. self.buff_[...].write().unwrap().write_to_buff(...);

This is so that writes to write buffer do not block reads from read buffer (as you wanted according to my understanding).

If you did a db.write().unwrap().write_to_buff(...) you would be in similar situation as you had with Mutex. Writers would block readers and vice versa.

You can think of it as the outer lock (the one around the DoubleBufT) protects only the state of the buffer switch (buffer indices) and "access path" to buffers, not the buffers themselves. The inner locks (the ones around the SingleBufT) protect the buffers (their contents).

So:

  • To switch buffers you need an exclusive (i.e. write) access to the outer struct to block off any buffer reads or writes (or another switches).
  • To write to the write buffer you need a shared (i.e. read) access to the outer struct so you can do reads and writes (to their respective buffers) concurrently. This still does not give you access to the buffer contents. For that you need to get an exclusive access to the buffer itself, which is done inside the write_to_buff method.
  • To read from the read buffer you need a shared (i.e. read) access to the outer struct, again to allow concurrent reads and writes. This time you want also shared access to the buffer itself so that multiple reads can run concurrently.

You can think of RwLock::read() as "get shared access" and RwLock::write() as "get exclusive access".

2 Likes

Check out left_right - Rust

2 Likes

Fantastic, thank you for the concise explanation, I consider this solved!

Will do, thanks!