Is anything obviously wrong with this multi-threaded code that is meant to allow non-blocking reads?

I want to modify a HashMap from one thread and read it from another, where the reading thread is the UI thread so I do not want to block waiting for a Mutex on reads. So I've written the code below since I was unable to find well used and/or explained way to handle this use case.

type Map = HashMap<usize, u8>;
type Maps = [Mutex<Map>; 2];

pub fn new_handles(
    capacity: usize,
) -> (ReadHandle, WriteHandle) {
    let maps = Arc::new([
        Mutex::new(Map::with_capacity(capacity)),
        Mutex::new(Map::with_capacity(capacity)),
    ]);

    (
        ReadHandle {
            maps: maps.clone(),
            index: 0,
        },
        WriteHandle { maps },
    )
}

/// There should be at most one `WriteHandle` for a given backing store of maps.
pub struct WriteHandle {
    maps: Arc<Maps>,
}

impl WriteHandle {
    fn perform_write<F>(&mut self, action: F)
    where
        F: Fn(&mut Map) -> (),
    {
        // Just in case we somehow would loop forever otherwise
        let mut count = 0;
        let mut wrote_to_0 = false;
        let mut wrote_to_1 = false;
        while count < 16 && !(wrote_to_0 && wrote_to_1) {
            if !wrote_to_0 {
                if let Ok(ref mut map) = self.maps.maps[0].try_lock() {
                    action(map);
                    wrote_to_0 = true;
                }
            }
            if !wrote_to_1 {
                if let Ok(ref mut map) = self.maps.maps[1].try_lock() {
                    action(map);
                    wrote_to_1 = true;
                }
            }

            count += 1;
        }
        debug_assert!(wrote_to_1 && wrote_to_2);
    }
}

pub struct ReadHandle {
    maps: Arc<Maps>,
    index: u8,
}

impl ReadHandle {
    fn get(&mut self) -> Map {
        if let Ok(read_ref) = self.maps[self.index as usize].try_lock() {
            return (*read_ref).clone();
        }

        self.index += 1;
        self.index &= 1;

        if let Ok(read_ref) = self.maps[self.index as usize].try_lock() {
            return read_ref.clone();
        }

        debug_assert!(false, "neither of the maps were free to read!");
        Map::with_capacity(0)
    }
}

I wrote some tests to try and make sure that the writes all went through and that debug_asserts did not trigger under expected use, which involved making multiple threads and sending messages over channels. But the tests would fail and succeed apparently randomly. For example, if I compile a test binary containing multiple tests, and don't filter any out, one of the tests fails. But if I filter to just that test, it passes! This is even running with --test-threads=1, so I don't know how to narrow down what is causing the issue.

It’s hard to say without seeing exactly what the test expectations are, but this data structure appears to have nondeterministic read behavior. It attempts to write to the first map that it can lock, and reads attempt to do the same. Given any lock contention, you are guaranteeing scenarios where you cannot read your own writes.

Have you considered an RwLock instead? It might not be exactly what you’re expecting.

Another alternative is using lock-free (eventually consistent) data structures like the ones provided by crossbeam or evmap.

Do you by any chance have many more reads than writes? In that case you could use a crossbeam channel to send changes to the reading thread, and the reading thread could empty that channel and modify the hash map on the reading thread every time it reads?

From the RwLock documentation:

An RwLock will allow any number of readers to acquire the lock as long as a writer is not holding the lock.

That seems to imply that unless I do something similar to what I have done with Mutexes and RwLock will still block, so I don't think that will work.

I had considered evmap as a backup plan, but I was curious about how to do something like this myself. It also seems to me that both evmap and crossbeam are trying to handle many different use cases, and I figured that it should be possible to solve my fairly particular problem with something much simpler.

Ah! I do in fact have many more reads than writes. That technique seems pleasantly simple. If I bound the number of messages the reading thread will receive, then I think that should give me the non-blocking behavior I need.

I don't see a particular reason to use a crossbeam channel instead of a std::sync::mpsc channel, given I do not currently have a dependency on crossbeam. They claim to have better performance, but I do not expect the portion of my app managing this HashMap to be the bottleneck, given the reads never block.

Yeah it was mostly because they're considered faster.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.