How to extract the payload from a rumqtt message callback

How would I access the elements of the received message in crate rumqtt such as topic, payload, etc.?

// Set callback for receiving incoming messages.
let callback = |msg| println!("Received payload: {:?}", msg );

let mq_cbs = MqttCallback::new().on_message(callback);

It seems like you could simply do something like:

// Set callback for receiving incoming messages.
let callback = |msg| println!("Received payload: {:?}", *msg.payload );

let mq_cbs = MqttCallback::new().on_message(callback);

Be carefull though that the payload is wrapped into an Arc and it is a Vec of u8, so you may need to trick to print it correctly.

http://atherenergy.github.io/rumqtt/rumqtt/struct.Message.html

When I do that, msg.payload throws an error: [rustc] the type of this value must be known in this context.

Can you paste the whole error?

Are you using cargo or directly rustc?

I am using cargo.

image

Here is the cargo error:

error[E0619]: the type of this value must be known in this context
--> src/main.rs:126:45
|
126 | println!("Received payload: {:?}", *msg.payload );
| ^^^^^^^^^^^

error: aborting due to previous error

In println macro type of msg must be known, but it cannot be deduced from context. Add explicit type for msg argument: let callback = |msg: rumqtt::Message|... .

Nice. I didn't expect that since when I put the mouse over msg it identified the data type. Now what would be the best way to extract the payload from a vector wrapped in an Arc?

What does extract mean exactly? Arc<Vec<u8>> lets you get at the underlying vec transparently (for immutable operations) because Arc derefs to the underlying value.

Got it. That I didn't know and now everything is all good! Thank you all!

I actually have one additional question; I used the following to print the individual elements of "msg":

        let t = &msg.topic.as_str();
        let s = String::from_utf8_lossy(&msg.payload);
        println!("Topic = {}", t);
        println!("Received payload: {}", s);

How can I use t and s outside of the closure?

Rust's scoping rules mean that you must have an owned copy of the data, in order to "give them away" outside the closure.
You are already creating an owned String for the payload, so this can easily be assigned to an "outside" variable.
The topic looks like it is a (non-owned, borrowed) str, so you either need to "upgrade" it to an owned String, or do everything in the closure.

What exactly are you doing that you need them "outside" the closure? Maybe you can also do the logic inside?

I need the data outside as it needs to remain around for a while even as new data (messages) are coming in. What would the assignment so an outside variable look like? Every time I try to make an assignment to an outside variable I get an error: [rustc] expected a closure that implements the Fn trait, but this closure only implements FnMut

You will probably need to create a channel and send the received data over it inside the closure, and have some other code read from this channel.

When I create a simple channel:

let (tx,rx) = channel();

let callback = |msg: rumqtt::Message| {
    let t = &msg.topic.as_str();
    let s = String::from_utf8_lossy(&msg.payload);
    //println!("Topic = {}", t);
    //println!("Received payload: {}", s);
    tx.send(10).unwrap();
};

println!("CHannel data = {}", rx.recv().unwrap());

let mq_cbs = MqttCallback::new().on_message(callback);

The calling function (on_message) errors out with:

error[E0277]: the trait bound std::sync::mpsc::Sender<i32>: std::marker::Sync is not satisfied in [closure@src/main.rs:133:20: 139:6 tx:&std::sync::mpsc::Sender<i32>]
--> src/main.rs:143:38
|
143 | let mq_cbs = MqttCallback::new().on_message(callback);
| ^^^^^^^^^^ std::sync::mpsc::Sender<i32> cannot be shared between threads safely
|
= help: within [closure@src/main.rs:133:20: 139:6 tx:&std::sync::mpsc::Sender<i32>], the trait std::marker::Sync is not implemented for std::sync::mpsc::Sender<i32>
= note: required because it appears within the type &std::sync::mpsc::Sender<i32>
= note: required because it appears within the type [closure@src/main.rs:133:20: 139:6 tx:&std::sync::mpsc::Sender<i32>]

error: aborting due to previous error

Looks like you’ll need to put the Sender inside a Mutex.

Is rumqtt the primary mqtt lib for Rust? It looks a bit rough around the edges.

1 Like

Unfortunately I haven't found a better all Rust client that is close to "asynchronous". I have considered using the Eclipse Paho Rust client which is a wrapper around their C library. I use that library in another C application I have done and it works well. If I Continue with this, what would placing the sender inside a Mutex look like?

let (tx, rx) = channel();
let tx = Mutex::new(tx);
...
tx.lock().unwrap().send(10).unwrap();
1 Like

The original intent of the Paho MQTT Rust client was to move towards a "100%" Rust solution in a future version. But new features are being added to the underlying C library at a quick pace right now, including WebSockets and MQTT v5 support, and I feel that time spent on feature support is a higher priority than more "Rustiness" in the library.

Last year, a few members of the Paho team were talking about language options for new projects (brokers, bridges, etc), and and Rust came up as a likely candidate. The Rust client library is our initial foray in the language to see how it feels. So far, so good!

I'm committed to getting the Paho Rust library production-ready over the next few months, and will be taking it into commercial, production use at the same time. So I'm aiming to make it pretty solid.

Any help, guidance, comments, and support would be greatly appreciated.

Frank P.
Paho Rust and C++ Maintainer

1 Like

Is the paho rust mqtt lib available somewhere? Ah, I guess it’s https://github.com/eclipse/paho.mqtt.rust - I didn’t see it on crates.io and thought it didn’t exist yet.

1 Like