Mutex lock in closure

Hello!

First of all, I'm a very beginner in Rust, so sorry if my question sounds stupid.

I try to modify a struct field inside a closure called in an async loop.
However, it seems the Mutex used in the callback is locked for some reason.
I tried to reduce my use case keeping it as simple as possible :

use futures::{executor::block_on, future::join_all};

use std::sync::{Arc, Mutex};

type Callback = Box<dyn Fn()>;

struct Test {
    callback: Option<Callback>,
    data: String,
}

impl Test {
    async fn notify(&self) {
        if let Some(callback) = &self.callback {
            callback();
        }
    }

    fn on(&mut self, callback: Callback) {
        self.callback = Some(callback);
    }

    fn init(test: Arc<Mutex<Test>>) {
        let test_clone = Arc::clone(&test);
        test.lock().unwrap().on(Box::new(move || {
            test_clone.lock().unwrap().data = "listened".to_owned();
        }));
    }

    async fn process(&self) {
        let mut futs = vec![];
        for _elem in 0..2 {
            futs.push(self.notify());
        }
        join_all(futs).await;
    }
}

fn main() {
    let test = Arc::new(Mutex::new(Test {
        callback: None,
        data: "init".to_owned(),
    }));

    let test_clone = Arc::clone(&test);
    Test::init(test_clone);
    block_on(Arc::clone(&test).lock().unwrap().process());
}

Any idea?

Yes, it’s locked. Your call

block_on(Arc::clone(&test).lock().unwrap().process())

locks the mutex and holds onto the lock for the full duration of the block_on operation. This prohibits the callback from locking the same mutex, a nice deadlock.

One way to resolve the problem would be to remove the need of &mut self references on Test entirely, by putting the individual fields into their own Mutex or similar. The callback field could also use an RwLock for your use-case so that notify calling the Fn callback doesn’t need to claim exclusive access to the callback. Or even a OnceCell if you don’t plan on calling .on more than once. Let’s do it with Mutexes for now:

use futures::{executor::block_on, future::join_all};

use std::sync::{Arc, Mutex};

type Callback = Box<dyn Fn()>;

struct Test {
    callback: Mutex<Option<Callback>>,
    data: Mutex<String>,
}

impl Test {
    async fn notify(&self) {
        if let Some(callback) = &*self.callback.lock().unwrap() {
            callback();
        }
    }

    fn on(&self, callback: Callback) {
        *self.callback.lock().unwrap() = Some(callback);
    }

    fn init(test: Arc<Test>) {
        let test_clone = Arc::clone(&test);
        test.on(Box::new(move || {
            *test_clone.data.lock().unwrap() = "listened".to_owned();
        }));
    }

    async fn process(&self) {
        let mut futs = vec![];
        for _elem in 0..2 {
            futs.push(self.notify());
        }
        join_all(futs).await;
    }
}

fn main() {
    let test = Arc::new(Test {
        callback: None.into(),
        data: "init".to_owned().into(),
    });

    let test_clone = Arc::clone(&test);
    Test::init(test_clone);
    block_on(Arc::clone(&test).process());
}

Also note that your code (and also my version of it) creates a reference-cycle, so that dropping an initialized Test will leak memory.


Edit: Here’s a modified version avoiding the leak

use futures::{executor::block_on, future::join_all};

use std::sync::{Arc, Mutex};

type Callback = Box<dyn Fn()>;

struct Test {
    callback: Mutex<Option<Callback>>,
    data: Mutex<String>,
}

impl Test {
    async fn notify(&self) {
        if let Some(callback) = &*self.callback.lock().unwrap() {
            callback();
        }
    }

    fn on(&self, callback: Callback) {
        *self.callback.lock().unwrap() = Some(callback);
    }

    fn init(test: &Arc<Test>) {
        let test_clone = Arc::downgrade(test);
        test.on(Box::new(move || {
            *test_clone.upgrade().unwrap().data.lock().unwrap() = "listened".to_owned();
        }));
    }

    async fn process(&self) {
        let mut futs = vec![];
        for _elem in 0..2 {
            futs.push(self.notify());
        }
        join_all(futs).await;
    }
}

fn main() {
    let test = Arc::new(Test {
        callback: None.into(),
        data: "init".to_owned().into(),
    });

    Test::init(&test);
    block_on(Arc::clone(&test).process());
}
1 Like

Thank you for your detailed response! It works like a charm :slight_smile: