What's the idiomatic way to implement this relationship and functionality?

Greetings,

I am new to Rust and right now tinkering with it, wondering if I should use it for the software on an embedded device I am developing on my job.

Right now I am writing a test program that listens to get messages over MQTT(using rumqtt) and then call functions in a hashmap associated with the MQTT topics.

So far my code looks something like this:

struct myCom {
    pub receiver: Receiver<Notification>,
    callback_map: HashMap<String, fn()>,
}

impl myCom {
   pub fn listen(&self) {
        thread::spawn(move || loop {
                         for notification in &self.receiver {
                    match notification {
                        rumqtt::client::Notification::Publish(packet) => {
                            println!("{:?}", packet);
                            match self.callback_map.get(&packet.topic_name) {
                                Some(&func) => {
                                    func();
                                }
                                _ => println!("No callback associated with this topic"),
                            }
                        },
                        _ => println!("Nope"),
                    }
                }
        });
}

  pub fn add_callback(&self, topic_name: String, callback: fn()) {
        &self.callback_map.insert(topic_name, callback);
    }
}

This works in the usecase where I call add_callback() before listen(). However, I would like to be able to add callbacks at a later stage as well, and the way I have implemented this gives me two different errors, depending on the changes I make:

If I have listen(&self), I get the error that "cannot infer an appropriate lifetime due to conflicting requirements" on the spawn thread line, which I guess is that the lifetime of the Receiver cannot be guaranteed for the lifetime of the "endless" listen function?
If I have listen(self) then I get the error "borrow of moved value: myStruct", if I try to call add_callback after I called listen(), which I guess is since the endless looping listen function owns the initialized struct.

So what is the correct way of composition for this kind of problem?
What is the correct way of safely both writing (add callbacks) to this hashmap and also reading values in the listen()-function? I have added Arc<Mutex>, which I think makes writing and reading safe. I still have the lifetime problem though.
After two weeks I really like Rust and how it makes me think about these things, rather than allowing everything and then failing at a later stage.

I'm new to all this but I notice you are using 'self.callback_map' in both your thread and in your main line code. This is a no-no in any language unless there is some mutual exclusion wrapped around those accesses. That is beside the issue of life times.

The Rust book chapter on threads show a nice example of sharing data between threads using 'Arc' https://doc.rust-lang.org/book/ch16-03-shared-state.html

The trick here is the need to 'clone' the Arc before handing it over to the thread:

Your code might look something like this then:

        let callback_map = Arc::clone(&self.callback_map);
        let handle = thread::spawn(move || {
            let mut callback_map = callback_map.lock().unwrap();
        ...
        });

Thank you for your reply. This is exactly what I have done (I think, I am not near my computer right now, can share it tomorrow). I also mentioned this in the text, but I avoided including it in my code sample as I wanted to make it as easy as possible.
Even though this is indeed safe, I still get the lifetime error. I have also experimented with using the lifetime specifier, but with no luck so far.

I might be wrong but I'm pretty sure one needs atomicity (mutual exclusion) when reading and writing data from different threads. Hence the "A" in "Arc".

Then one needs reference counting to deal with the lifetime problem and safe deletion of shared objects. Hence the "rc" in "Arc".

What I'm not sure about is that your HashMap is a member of a struct. So there must be lifetime issues to think about there.

The only way to sort this is to see the actual code you have and the actual compilation errors it produces. Not some other different code.

If you can make a short and simple example on the Rust playground that would be even better:https://play.rust-lang.org/

Thanks again, I have indeed used Arc. Unfortunately Playground does not have rumqtt which I rely heavily on (also, I connect to a private broker, which make it a bit hard to easily share a sample), but I will make a fully working sample by tomorrow.

Arc doesn't provide any mutual exclusion -- for that you're going to need a Mutex (or something like it). Here's an example sharing access to a HashMap from multiple threads (building on the simple counter example from the Shared State chapter you linked to):

use std::sync::{Mutex, Arc};
use std::thread;
use std::collections::HashMap;

fn main() {
    let map = Arc::new(Mutex::new(HashMap::<u32, bool>::new()));
    let mut handles = vec![];

    for tid in 0..10 {
        let map = Arc::clone(&map);
        let handle = thread::spawn(move || {
            let mut map = map.lock().unwrap();
            map.insert(tid, true);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Contents: {:?}", map.lock().unwrap());
}

(playground)

Yeah, like that. Sorry I forgot the mutex part.

It's confusing.

An unrelated question. Is there a difference between Arc::clone(&map) and map.clone() in this example? I find the latter easier to read, but the former makes it clear that map is an Arc.

This is the difference. I prefer the former.

I'm not sure how the attached code can be compiled. Typically you will need to move receiver and callback_map into the closure - either by cloning or moving them out of self. What I usually do is:

  • Spawn the thread in Self::new
  • Use mpsc channel to communicate with the thread

The thread will have ownership of callback_map and it receives new callback though mpsc channel.

You can use a receiver that can receive both callbacks and notifications. You can either use crossbeam::select! or define an enum.

This construction works if you don't need callback_map in any other places, otherwise you can always store the HashMap inside an Arc<Mutex<>>.

1 Like

Thank you!
I should have provided a working sample from the beginning. I will try out your recommendations now, and get back to you if it is working.
I have also provided code that compiles and runs.
If you do the add_callback after it started listening, it will complain about the moved value (which, of course, totally makes sense).
I will check out the mpsc channel solution.

use std::{fs::read, thread, collections::HashMap, sync::{Arc, Mutex}};
use rumqtt::{ReconnectOptions, Receiver, Notification, MqttClient, MqttOptions};

fn my_test() {
    println!("I callbacked");
}

fn main() {
    let client_id = String::from("myclient");
    let ca_path = String::from("root-CA.crt");
    let client_cert_path = String::from("client.pem");
    let client_key_path = String::from("private.key");
    let endpoint = String::from("endpoint.com");

    let mqtt_options = MqttOptions::new(client_id, endpoint, 8883)
        .set_ca(read(ca_path).unwrap())
        .set_client_auth(read(client_cert_path).unwrap(), read(client_key_path).unwrap())
        .set_keep_alive(10)
        .set_reconnect_opts(ReconnectOptions::Always(5));
    let mqtt_client = MqttClient::start(mqtt_options).unwrap();
    let my_client = MyStruct { client: mqtt_client.0, receiver: mqtt_client.1, callback_map: Arc::new(Mutex::new(HashMap::new())) };
    my_client.add_callback(String::from("mytopic"), my_test);
    my_client.start_listening();
}

impl MyStruct {
    pub fn start_listening(self) {
        let cb_map = Arc::clone(&self.callback_map);
        thread::spawn(move || loop {
            for notification in &self.receiver {
                    match notification {
                        rumqtt::client::Notification::Publish(packet) => {
                            println!("{:?}", packet);
                            let map_lock = cb_map.lock().unwrap();
                            match map_lock.get(&packet.topic_name) {
                                Some(&func) => {
                                    func();
                                }
                                _ => println!("No callback associated with this topic"),
                            }

                        },
                        _ => println!("Nope"),
                    }
                }
        });
    }

    pub fn add_callback(&self, topic_name: String, callback: fn()) {
        let cb_map = Arc::clone(&self.callback_map);
        let mut map_lock = cb_map.lock().unwrap();
        map_lock.insert(topic_name, callback);
    }
}


pub struct MyStruct {
    pub client: MqttClient,
    pub receiver: Receiver<Notification>,
    callback_map: Arc<Mutex<HashMap<String, fn()>>>,
}

You could make a new CallbackAdder type that has a clone of the arc and an add_callback method. This would continue to work after calling start_listening.

Note that the clone is not necessary in add_callback, as the borrow of the map is released when the function returns.

pub fn add_callback(&self, topic_name: String, callback: fn()) {
    self.callback_map
        .lock()
        .unwrap()
        .insert(topic_name, callback);
}
1 Like

There is another simple fix for you code, that is, use receiver: Option<Receiver<Notification>> and call receiver.take() to move the value out, this way start_listening can take a &mut self instead.

1 Like

Note that the receiver in rumqtt is a reexport of crossbeam's receiver, so you can just clone the receiver instead of moving it using an option. However, moving it out enforces that only one thread is spawned, so that may be preferable.

Thanks everyone, lots of helpful answers here. What a community!

But I am a bit confused though:
If I do .clone() or .take() it, then make the function &mut self, will I clone outside of the function call, then send it as a parameter? And is it the fact that I clone the receiver that will remove the lifetime problem with spawning the thread? Wouldn't the spawned thread be dependent on the lifetime of the (cloned) receiver anyways?

I guess I have some reading to do...:smile:

I would clone or take it inside the function but before spawning the thread, then move the clone into the closure.

A clone is never a borrow of the thing you cloned. The crossbeam channel uses something similar to Arc internally which allows you to clone it without borrowing the original channel, and in fact the clone can live longer than the thing it is a clone of.

1 Like

Thank you, I did a cloning before spawning the thread and moved the clone inside, and now it works just like I wanted!
Not only does it work now, but I also learnt a lot in the process, thanks!

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.