CondVar for RwLock?

I have a counter protected by Arc RwLock.

When the counter reaches a limit, I need to wait until it goes back to zero.

Another thread decrements the counter ( it is performing commits to update the database, but the number of outstanding commits needs to go to zero occasionally to allow memory to be freed up - see here ).

It seems like std::sync::CondVar nearly does the job, but it works with std::sync::Mutex rather than std::sync::RwLock.

So any ideas how should I proceed? ( I have a few vague ideas, like using a channel, but wondering if there is a better way ).

If no other data is protected by the RwLock, consider using an AtomicU32 counter inside the Arc and combining it with atomic_wait - Rust for wait/wake.

Hmm, I would prefer a solution not involving a crate. Also there is other data. I think perhaps a boolean in the RwLock can be set to indicate that the other thread is waiting, and if this is the case an (empty) message can be sent through a channel to wake it up when the counter goes to zero. I think that works smoothly although I am not quite sure!

[ I also slightly mis-stated the problem, the condition is not the counter reaching a limit, it is the number of commits since the counter last went to zero that needs to be limited ]

Well I used a boolean and a channel. I think another way might be for one thread to hold a Mutex "long term" and release it at the appropriate time to allow the other thread to proceed, but the channel seems the more straight-forward solution in terms of what I know and understand.

Code is here: atomfile.rs - source

Is it a thread that is just there for the given cleanup task? If so you can use thread::park and thread::unpark.

5 Likes

I didn't know about thread::park at all before! Yes, that looks like it could do the job nicely, thanks.

Here is the version using park/unpark.

impl Storage for AtomicFile {
    fn commit(&mut self, size: u64) {
        self.size = size;
        if self.map.map.is_empty() {
            return;
        }
        while {
            let cf = &mut self.cf.write().unwrap();
            // If the CommitFile map has got "large" wait for the commit process to finish (so the map is reset).
            if cf.wait(3000) {
                true
            } else {
                let map = std::mem::take(&mut self.map);
                cf.todo += 1;
                for (k, v) in map.map.iter() {
                    let start = k + 1 - v.len as u64;
                    cf.write_data(start, v.data.clone(), v.off, v.len);
                }
                self.tx.send((size, map)).unwrap();
                false
            }
        } {
            std::thread::park();
        }
    }
impl CommitFile {
    fn done_one(&mut self) {
        self.todo -= 1;
        if self.todo == 0 {
            self.map = WMap::default();
            if self.client_waiting {
                self.client_waiting = false;
                self.client.unpark();
            }
        }
    }
...

Full code here:

As documented in park in std::thread - Rust (rust-lang.org), be aware of spurious wakeups. The call to park may return before the thread is explicitly unparked. The example in the docs takes care of this subtle but important detail.

3 Likes

Yes, I don't really understand why there would be spurious wake-ups, but I did change an "if" to a "while" to allow for this.

I also realised shortly after my previous post that I had falsely assumed the thread constructing AtomicFile would be the thread that used it. So I just fixed that. It does seem that compared to using a Channel there are pitfalls. I am considering going back to the channel version on grounds that it is perhaps the "safer" option.

Fixed version here: rustdb::atomfile - Rust

Ok... the park/unpark method as well as being tricky, proved to be a bit inflexible when there are potentially multiple threads waiting for the database commits to be complete.

What I have ended up using, and it seems quite neat, is an extra Mutex which doesn't protect any data ( it is declared as Arc< Mutex<()>> ), but is kept locked by the writer process when it is busy. A waiting thread loops checking the condition it wants and locking the extra Mutex to wait for the writer to finish.

Code:

impl AtomicFile {
    /// Construct a new AtomicFle. stg is the main underlying storage, upd is temporary storage for updates during commit.
    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>) -> Box<Self> {
        let size = stg.size();
        let mut baf = BasicAtomicFile::new(stg.clone(), upd);
        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
        let cf1 = Arc::new(RwLock::new(CommitFile {
            stg,
            map: WMap::default(),
            todo: 0,
            done: 0,
            busy: Arc::new(Mutex::new(())),
        }));
        let cf = cf1.clone();
        std::thread::spawn(move || {
            while let Ok((size, map)) = rx.recv() {
                let busy = cf1.read().unwrap().busy.clone();
                let _lock = busy.lock();
                baf.map = map;
                baf.commit(size);
                cf1.write().unwrap().done_one();
            }
        });
        Box::new(Self {
            map: WMap::default(),
            cf,
            size,
            tx,
        })
    }

    /// Wait for the write process.
    fn wait(&self) {
        let busy = self.cf.read().unwrap().busy.clone();
        let _ = busy.lock();
    }
}

From

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.