Hi
I have some mqtt code for subscribe and publish using paho mqtt rust: https://github.com/eclipse/paho.mqtt.rust
In main I have called the both of the functions but only subscribe functions seems to be working and publish function does execute when I call it. I was wondering what would be the problem and how to fix this issue. Also how can I put this closure:
// Attach a closure to the client to receive callback
// on incoming messages.
cli.set_message_callback(|_cli,msg| {
if let Some(msg) = msg {
let topic = msg.topic();
let payload_str = msg.payload_str();
println!("{} - {}", topic, payload_str);
}
});
into a on_message function so that whenever I subscribe and then publish the topic and and payload is printed via the callback from inside an on_message function.
extern crate log;
extern crate env_logger;
extern crate paho_mqtt as mqtt;
use std::{env, process, thread};
use std::time::Duration;
use futures::Future;
// The topics to which we subscribe.
const TOPICS: &[&str] = &[ "test", "hello" ];
const QOS: &[i32] = &[1, 1];
// Callback for a successful connection to the broker.
// We subscribe to the topic(s) we want here.
fn on_connect_success(cli: &mqtt::AsyncClient, _msgid: u16) {
println!("Connection succeeded");
// Subscribe to the desired topic(s).
cli.subscribe_many(TOPICS, QOS);
println!("Subscribing to topics: {:?}", TOPICS);
// TODO: This doesn't yet handle a failed subscription.
}
// Callback for a failed attempt to connect to the server.
// We simply sleep and then try again.
//
// Note that normally we don't want to do a blocking operation or sleep
// from within a callback. But in this case, we know that the client is
// *not* conected, and thus not doing anything important. So we don't worry
// too much about stopping its callback thread.
fn on_connect_failure(cli: &mqtt::AsyncClient, _msgid: u16, rc: i32) {
println!("Connection attempt failed with error code {}.\n", rc);
thread::sleep(Duration::from_millis(2500));
cli.reconnect_with_callbacks(on_connect_success, on_connect_failure);
}
/////////////////////////////////////////////////////////////////////////////
fn subscribe() {
// Initialize the logger from the environment
env_logger::init();
let host = env::args().nth(1).unwrap_or_else(||
"test.mosquitto.org".to_string()
);
// Create the client. Use an ID for a persistent session.
// A real system should try harder to use a unique ID.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id("rust_async_subscribe")
.finalize();
// Create the client connection
let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| {
println!("Error creating the client: {:?}", e);
process::exit(1);
});
// Set a closure to be called whenever the client loses the connection.
// It will attempt to reconnect, and set up function callbacks to keep
// retrying until the connection is re-established.
cli.set_connection_lost_callback(|cli: &mqtt::AsyncClient| {
println!("Connection lost. Attempting reconnect.");
thread::sleep(Duration::from_millis(2500));
cli.reconnect_with_callbacks(on_connect_success, on_connect_failure);
});
// Attach a closure to the client to receive callback
// on incoming messages.
cli.set_message_callback(|_cli,msg| {
if let Some(msg) = msg {
let topic = msg.topic();
let payload_str = msg.payload_str();
println!("{} - {}", topic, payload_str);
}
});
// Define the set of options for the connection
let lwt = mqtt::Message::new("test", "Async subscriber lost connection", 1);
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.mqtt_version(mqtt::MQTT_VERSION_3_1_1)
.clean_session(true)
.will_message(lwt)
.finalize();
// Make the connection to the broker
println!("Connecting to the MQTT server...");
cli.connect_with_callbacks(conn_opts, on_connect_success, on_connect_failure);
// Just wait for incoming messages.
loop {
thread::sleep(Duration::from_millis(1000));
}
// Hitting ^C will exit the app and cause the broker to publish the
// LWT message since we're not disconnecting cleanly.
}
fn publish() {
// Initialize the logger from the environment
env_logger::init();
// Command-line option(s)
let host = env::args().nth(1).unwrap_or_else(||
"test.mosquitto.org".to_string()
);
// Create a client to the specified host, no persistence
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.persistence(mqtt::PersistenceType::None)
.finalize();
let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {}", err);
process::exit(1);
});
// Connect with default options
let conn_opts = mqtt::ConnectOptions::new();
// Create a message and publish it
println!("Publishing a message on the 'test' topic");
let msg = mqtt::Message::new("test", "Hello Rust MQTT world!", 0);
let tok = cli.publish(msg);
}
fn main() {
subscribe();
publish();
}
Thanks