CondVar for RwLock?

I have a counter protected by Arc RwLock.

When the counter reaches a limit, I need to wait until it goes back to zero.

Another thread decrements the counter ( it is performing commits to update the database, but the number of outstanding commits needs to go to zero occasionally to allow memory to be freed up - see here ).

It seems like std::sync::CondVar nearly does the job, but it works with std::sync::Mutex rather than std::sync::RwLock.

So any ideas how should I proceed? ( I have a few vague ideas, like using a channel, but wondering if there is a better way ).

If no other data is protected by the RwLock, consider using an AtomicU32 counter inside the Arc and combining it with atomic_wait - Rust for wait/wake.

Hmm, I would prefer a solution not involving a crate. Also there is other data. I think perhaps a boolean in the RwLock can be set to indicate that the other thread is waiting, and if this is the case an (empty) message can be sent through a channel to wake it up when the counter goes to zero. I think that works smoothly although I am not quite sure!

[ I also slightly mis-stated the problem, the condition is not the counter reaching a limit, it is the number of commits since the counter last went to zero that needs to be limited ]

Well I used a boolean and a channel. I think another way might be for one thread to hold a Mutex "long term" and release it at the appropriate time to allow the other thread to proceed, but the channel seems the more straight-forward solution in terms of what I know and understand.

Code is here: - source

Is it a thread that is just there for the given cleanup task? If so you can use thread::park and thread::unpark.


I didn't know about thread::park at all before! Yes, that looks like it could do the job nicely, thanks.

Here is the version using park/unpark.

impl Storage for AtomicFile {
    fn commit(&mut self, size: u64) {
        self.size = size;
        if {
        while {
            let cf = &mut;
            // If the CommitFile map has got "large" wait for the commit process to finish (so the map is reset).
            if cf.wait(3000) {
            } else {
                let map = std::mem::take(&mut;
                cf.todo += 1;
                for (k, v) in {
                    let start = k + 1 - v.len as u64;
                    cf.write_data(start,,, v.len);
                self.tx.send((size, map)).unwrap();
        } {
impl CommitFile {
    fn done_one(&mut self) {
        self.todo -= 1;
        if self.todo == 0 {
   = WMap::default();
            if self.client_waiting {
                self.client_waiting = false;

Full code here:

As documented in park in std::thread - Rust (, be aware of spurious wakeups. The call to park may return before the thread is explicitly unparked. The example in the docs takes care of this subtle but important detail.


Yes, I don't really understand why there would be spurious wake-ups, but I did change an "if" to a "while" to allow for this.

I also realised shortly after my previous post that I had falsely assumed the thread constructing AtomicFile would be the thread that used it. So I just fixed that. It does seem that compared to using a Channel there are pitfalls. I am considering going back to the channel version on grounds that it is perhaps the "safer" option.

Fixed version here: rustdb::atomfile - Rust

Ok... the park/unpark method as well as being tricky, proved to be a bit inflexible when there are potentially multiple threads waiting for the database commits to be complete.

What I have ended up using, and it seems quite neat, is an extra Mutex which doesn't protect any data ( it is declared as Arc< Mutex<()>> ), but is kept locked by the writer process when it is busy. A waiting thread loops checking the condition it wants and locking the extra Mutex to wait for the writer to finish.


impl AtomicFile {
    /// Construct a new AtomicFle. stg is the main underlying storage, upd is temporary storage for updates during commit.
    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>) -> Box<Self> {
        let size = stg.size();
        let mut baf = BasicAtomicFile::new(stg.clone(), upd);
        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
        let cf1 = Arc::new(RwLock::new(CommitFile {
            map: WMap::default(),
            todo: 0,
            done: 0,
            busy: Arc::new(Mutex::new(())),
        let cf = cf1.clone();
        std::thread::spawn(move || {
            while let Ok((size, map)) = rx.recv() {
                let busy =;
                let _lock = busy.lock();
       = map;
        Box::new(Self {
            map: WMap::default(),

    /// Wait for the write process.
    fn wait(&self) {
        let busy =;
        let _ = busy.lock();


This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.