Async callback closure problem

Hello,

I've been struggling with a problem for a couple of days now and I can't figure out how to solve

I have created a trait, that in the subscribe function you passing in an asynchronous callback function (Here I'm unsure, if it's all correct, if I miss any lifetime parameters, and so on).

#[async_trait]
pub trait Client<'a> {
    async fn connect(&mut self, host_address: &str) -> Result<(), ClientError>;

    async fn publish<T>(&mut self, messages: &[Message<T>]) -> Result<(), ClientError>
    where
        T: ToString + Send + Sync;

    async fn subscribe<T, F, Fut>(
        &mut self,
        messages: &[Message<T>],
        on_signal_change: bool,
        cb: F,
    ) -> Result<(), ClientError>
    where
        F: Fn(Message<String>) -> Fut + Send + Sync,
        Fut: Future<Output = ()> + Send,
        T: Debug + Send + Sync + ToString + Display;
}

Later I have implemented this trait for a couple of clients that I'm going to use.
Here is my file where I test this client setup:

pub struct TestClients {
    beamy_client: BeamyBrokerClient,
    mqtt_client: MqttBrokerClient,
}

impl TestClients {
    pub fn new() -> Self {
        let beamy_client = BeamyBrokerClient::new();
        let mqtt_client = MqttBrokerClient::new();

        Self {
            beamy_client,
            mqtt_client,
        }
    }

    pub async fn init(&mut self, beamy_address: &str, mqtt_address: &str) {
        self.beamy_client.connect(beamy_address).await.unwrap();
        self.mqtt_client.connect(mqtt_address).await.unwrap();
    }

    pub async fn run(&mut self) -> Result<(), Box<dyn Error>> {
        let signals = Arc::new(Mutex::new(Vec::default()));

        self.mqtt_client
            .subscribe::<String, _, _>(
                &[Message::new("SIGNAL/SIGNALS", "SUBSCRIBE", None)],
                true,
                |message| async move {
                    let signals = Arc::clone(&signals);
                    let data: Vec<Message<String>> =
                        serde_json::from_str(message.value().unwrap()).unwrap();

                    let mut signals_data = signals.lock().await;
                    *signals_data = data;
                },
            )
            .await?;

        Ok(())
    }
}

And this issure occurs:

32 |           let signals = Arc::new(Mutex::new(Vec::default()));
   |               ------- captured outer variable
...
38 |                   |message| async move {
   |  ______________________________________^
39 | |                     let signals = Arc::clone(&signals);
   | |                                               -------
   | |                                               |
   | |                                               move occurs because `signals` has type `Arc<tokio::sync::Mutex<Vec<Message<std::string::String>>>>`, which does not implement the `Copy` trait
   | |                                               move occurs due to use in generator
40 | |                     let data: Vec<Message<String>> =
41 | |                         serde_json::from_str(message.value().unwrap()).unwrap();
...  |
44 | |                     *signals_data = data;
45 | |                 },
   | |_________________^ move out of `signals` occurs here

And I really don't know how to get forward from this.
Is this possible that I'm trying to achieve here? Can I access outer variables inside this closure?

Best regards,
Niclas

If the closure could only be called at most once, then the fix would be to add move to the closure as well (you only have it on the async block, which is inside the closure). However, in your case, you must also clone signals for each call, and you must do so before you get inside the async block.

self.mqtt_client
    .subscribe::<String, _, _>(
        &[Message::new("SIGNAL/SIGNALS", "SUBSCRIBE", None)],
        true,
        move |message| {
            let signals = Arc::clone(&signals);
            async move {
                let data: Vec<Message<String>> =
                    serde_json::from_str(message.value().unwrap()).unwrap();
    
                let mut signals_data = signals.lock().await;
                *signals_data = data;
            }
        }
    )
    .await?;
1 Like

Thanks!
This explains a lot.

But I still need to read more about asynchronous programming, but that's another discussion. :stuck_out_tongue: