A HashMap like structure that can be “mutated” from multiple threads

I’m trying to write a structure, a PageManager, that can hold up to a fixed number of pages where each page will be about 4K in size. I want to be able to access both the page manager and the pages stored within from multiple threads. A page can either be shared to multiple threads for reading or to one thread for writing. The standard Rust reference pattern.

The page manager does four things. You can insert a page, you can remove a page, and you can get a read or write reference to any of the pages in the page manager. I have come up with a solution that works ( I think) , but I have this feeling there must be a better way to do this.

I have tried just storing the pages in a HashMap but I couldn’t get that to work. Having a mutable reference to the page manager would let me insert or remove pages, but I think I can only do that once I have dropped all read and write locks on all pages. Wrapping the HashMap in a RwLock would not let me return the stored page but instead gave me error E0515, cannot return value referencing temporary value.

What I came up with was to allocate a vector where I can store a fixed number of pages and add empty pages in all slots. When inserting a page I find the first empty slot, or a page that is no longer in use and replace the page currently in the slot with the new page. Removing a page is replacing the page currently in the slot with an empty page. Now I can use a HashMap to store which page id points to which index in the vector, since the usize can be copied.

Here is a bare bones version of what I did.

I guess my question is how should I be doing this?

use std::collections::HashMap;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::time::{Duration};

struct Page {
    buffer: Vec<u8>
}

impl Page {
    fn new(size: usize) -> Self {
        Page {
            buffer: vec![0; size]
        }
    }
}

struct PageManger {
    page_map: RwLock<HashMap<u64, usize>>,
    /// The pages are stored in a fixed sized vector which will allow us to insert a page into an
    /// empty slot and remove a page as long as the page isn't in use.
    pages: Vec<Arc<RwLock<Page>>>
}

impl PageManger {
    fn new(capacity: usize) -> Self {
        let mut pages = Vec::new();
        for _ in 0..capacity {
            pages.push(Arc::new(RwLock::new(Page::new(0))));
        }
        PageManger {
            page_map: RwLock::new(HashMap::new()),
            pages
        }
    }

    fn insert_page(&self, page_id: u64, page: Page) {
        // First we try and insert the page into an empty slot.
        for (i, p) in self.pages.iter().enumerate() {
            match p.try_write() {
                Ok(mut marker) => {
                    if marker.buffer.len() > 0 {
                        continue;
                    }

                    *marker = page;
                    let mut page_map = self.page_map.write().unwrap();
                    page_map.insert(page_id, i);
                    return;
                }
                Err(_) => todo!()
            }
        }

        // If we didn't find an empty slot we replace the first page that allow us to get a write lock.
        for (i, p) in self.pages.iter().enumerate() {
            match p.try_write() {
                Ok(mut marker) => {
                    *marker = page;
                    let mut page_map = self.page_map.write().unwrap();
                    page_map.insert(page_id, i);
                    return;
                }
                Err(_) => todo!()
            }
        }

        // TODO: Error handling
    }

    fn remove_page(&self, page_id: u64) {
        let mut marker = self.get_page_mut(page_id);
        *marker = Page::new(0);
    }

    fn get_page(&self, page_id: u64) -> RwLockReadGuard<'_, Page> {
        let pos = self.page_map.read().unwrap().get(&page_id).copied().unwrap();
        self.pages[pos].read().unwrap()
    }

    fn get_page_mut(&self, page_id: u64) -> RwLockWriteGuard<'_, Page> {
        let pos = self.page_map.read().unwrap().get(&page_id).copied().unwrap();
        self.pages[pos].write().unwrap()
    }
}

fn main() {
    let page_manager = PageManger::new(3);
    page_manager.insert_page(100, Page::new(64));
    page_manager.insert_page(200, Page::new(64));
    let page_manager = Arc::new(page_manager);

    let mut handles = Vec::new();

    let pm = page_manager.clone();
    handles.push(std::thread::spawn(move ||{
        let p0 = pm.get_page(100);
        std::thread::sleep(Duration::from_millis(1500));
    }));

    let pm = page_manager.clone();
    handles.push(std::thread::spawn(move ||{
        let mut p1 = pm.get_page_mut(200);
        std::thread::sleep(Duration::from_millis(500));
        p1.buffer[0] = 1;
    }));

    let pm = page_manager.clone();
    handles.push(std::thread::spawn(move ||{
        std::thread::sleep(Duration::from_millis(100));
        let p1 = pm.get_page(200);
		  assert_eq!(p1.buffer[0], 1);
    }));

    let pm = page_manager.clone();
    handles.push(std::thread::spawn(move ||{
        std::thread::sleep(Duration::from_millis(10));
        pm.remove_page(100);
        pm.insert_page(100, Page::new(4096));
    }));


    for handle in handles {
        handle.join().unwrap();
    }

}

This type should work:

page_map: RwLock<HashMap<u64, Arc<RwLock<Page>>>>,

I tried doing that but I could no longer get a page out of the page manager.

pub fn get_page(&self, page_id: u64) -> Option<RwLockReadGuard<'_, Page>> {
        let page_map = self.page_map.read().unwrap();
        page_map.get(&page_id).map(|page| page.read().unwrap())
}

leads to this

error[E0515]: cannot return value referencing local variable `page_map`
  --> src/main.rs:41:9
     |
41 |         page_map.get(&page_id).map(|page| page.read().unwrap())
     |         ----------------------^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     |         |
     |         returns a value referencing data owned by the current function
     |         `page_map` is borrowed here

Ah, well. You can return an Arc<RwLock<Page>> by cloning the arc, but its true that this wont let you return an RwLockReadGuard directly.

I believe get_page and get_page_mut have race conditions.

If a preemption occurs between the two lines of code and insert_page (or maybe remove_page) is called by another thread during that time it's possible for the page_id to be invalidated. Part of the problem is that the page is not locked at that moment. Without holding a lock, insert_page has an opportunity to repurpose the slot. You would have a slot index (pos) to a slot that changed in between those two lines of code.

I think this is the mistaken idea. What works is to get Arc from the page manager ( something that is read-only and can be shared ), and when you want to mutate the pages, give the page manager the new page. Arc has a nice function, get_mut.

For comparison, my page manager is here : rustdb::pstore - Rust

Yes, that is a good point. I guess I could add some kind of lock on the page id as well, but there must be a better way than just throwing more locks at the problem :).

Using the type I originally suggested avoids all of these issues with races because the Arc is cloned before the outer lock is released.

2 Likes

Yes, thank you. At first I had some problems with a deadlock when removing a page but that was just me doing things in the wrong order. Here is what I ended up with.

struct PageManger {
    page_map: RwLock<HashMap<u64, Arc<RwLock<Page>>>>,
}

impl PageManger {
    fn new() -> Self {
        PageManger {
            page_map: RwLock::new(HashMap::new()),
        }
    }

    fn insert_page(&self, page_id: u64, page: Page) {
        let mut page_map = self.page_map.write().unwrap();
        page_map.insert(page_id, Arc::new(RwLock::new(page)));
    }

    fn remove_page(&self, page_id: u64) {
        // Get a write lock on the page to make sure no one is currently accessing it.
        let page = self.get_page(page_id).unwrap();
        let _ = page.write().unwrap();
        let mut page_map = self.page_map.write().unwrap();
        page_map.remove(&page_id);
    }

    fn get_page(&self, page_id: u64) -> Option<Arc<RwLock<Page>>> {
        let page_map = self.page_map.read().unwrap();
        let page = page_map.get(&page_id)?;
        Some(page.clone())
    }
}

Yes, returning an Arc instead of the acquired lock seems to work. I think RwLock::write() does pretty much the same thing as Arc::get_mut but RwLock will block on the current thread until the lock is acquired instead of returning None in case someone else has a reference to the page.

I came across this crate, perhaps you may find it useful.

Some observations...

  • The new version is unbounded (can consume all available memory).
  • insert_page replaces an existing Page. Anything holding a reference to the old Page will be unaware of the replacement until the next call to get_page.
  • I believe it's possible for something to get a reference to a Page that is removed (via remove_page) before that something tries to lock the Page. In other words, PageManager clients could be holding an orphaned Page with no indication that it's been orphaned by a third party calling remove_page. If I'm correct then let _ = page.write().unwrap(); is somewhere between marginally useful and useless.

Those are good points. This is not the final version, more of a proof of concept. I will limit the number of pages you can store but that should be quite trivial as I can just look at the length of the hash map. Regarding insert_page, the logic in the rest of the program should not try to insert a new page if it's in there but I should definitely make sure that can't happen.

The remove page issue seems a bit harder to solve. With the current approach, getting a page looks like this.

let page = page_manager.get_page(100).unwrap();
// <<<< preempted here
let page = page.read().unwrap();

Which would lead to the scenario you are pointing out. One possible solution could be to add a flag to the page struct which would indicate the page has been evicted. It seems like that should work, seeing as I need a write lock on the page to be able to remove it.

Thank you for pointing out the issues. I don't generally write multithreaded code, and have just started with Rust, so I really appreciate the feedback.

1 Like

chashmap looks pretty nifty. I haven't looked into how it works but having a lock per bucket rather than one lock on the entire hash map should improve performance. I will check it out.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.