After spending a considerable amount of time reading and studying Rust, I've decided to embark on rewriting some code. I have a Python program that I want to translate. Essentially, it communicates with a device using ModbusTCP and translates the data to MQTT.
Given that both Modbus and MQTT communication need to happen concurrently, I need the connections to be asynchronous. At this juncture, I'm attempting to make MQTT asynchronous without much success. Could I get some assistance?
I need to create a task to handle connection and auto-reconnect. If I understand how to do this, I can then subscribe to a topic.
const BROKER_ADDRESS: &str = "192.168.200.254";
const USER_NAME: &str = "user";
const PASSWORD: &str = "password";
const CLIENT_NAME: &str = "client-name";
use futures::{executor::block_on, stream::StreamExt};
use mqtt::{Client, ConnectOptions};
use paho_mqtt as mqtt;
use std::process;
use std::time::Duration;
fn create_mqtt() -> Client{
let host: String = "tcp://".to_owned() + BROKER_ADDRESS + ":1883";
let create_options = mqtt::CreateOptionsBuilder::new()
.server_uri(&host)
.client_id(CLIENT_NAME.to_string())
.finalize();
// Create a client.
let cli: Client = mqtt::Client::new(create_options).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// println!("{}", &host);
cli
}
fn client_opts() -> ConnectOptions{
mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(false)
.user_name(USER_NAME.to_string())
.password(PASSWORD.to_string())
.finalize()
}
#[tokio::main]
async fn main() {
let cli: Client = create_mqtt();
let cli_opts: ConnectOptions = client_opts();
// Make the connection to the broker
if cli.is_connected() {
println!("Connected");
}else{
println!("Not connected");
}
if let Err(err) = cli.connect(cli_opts) {
eprintln!("{}", err);
}
println!("Hello, world!");
}
It would be beneficial to have a function with connection and reconnection capabilities, allowing us to create a non-blocking task in the main loop.
The paho_mqtt only present some examples which are not clear for a beginner. I really appreciate any kind of help.
I got it working, but I'm not sure if its correct:
#[tokio::main]
async fn main() {
// Initialize the logger from the environment
env_logger::init();
info!("Iniciando o programa");
let cli: AsyncClient = mqtt_func::connect_to_mqtt().await;
// Just wait for incoming messages.
let mut k:u32 = 0;
loop {
thread::sleep(Duration::from_millis(1000));
k = heartbeat(k);
if k % 10 == 0 {
if !cli.is_connected() {
warn!("Can't publish. Not connected")
} else{
let msg: mqtt::Message = mqtt::Message::new("test", "Hello Rust MQTT world!", 0);
info!("Publishing");
let _tok = cli.publish(msg);
}
}
}
}
pub async fn connect_to_mqtt() -> mqtt::AsyncClient{
let mut rng = rand::thread_rng();
let random_number: u32 = rng.gen_range(0..=100);
// info!("Random number: {}", random_number);
let host: String = "tcp://".to_owned() + BROKER_ADDRESS + ":1883";
let create_options = mqtt::CreateOptionsBuilder::new()
.server_uri(&host)
.client_id(CLIENT_NAME.to_string() + &random_number.to_string())
.finalize();
// Create a client.
let cli: AsyncClient = mqtt::AsyncClient::new(create_options).unwrap_or_else(|err| {
info!("Error creating the client: {:?}", err);
process::exit(1);
});
// Set a closure to be called whenever the client connection is established.
// This is called after the initial connection and also after successful
// reconnections.
cli.set_connected_callback(|_| {
info!("Connected.");
});
// Set a closure to be called whenever the client loses the connection.
// It just reports the state to the user, and lets the library try to
// reconnect.
cli.set_connection_lost_callback(|_| {
info!("Connection lost. Attempting reconnect...");
});
// Attach a closure to the client to receive callback
// on incoming messages.
cli.set_message_callback(on_message);
// cli.set_message_callback(|_cli, msg| {
// if let Some(msg) = msg {
// let topic = msg.topic();
// let payload_str = msg.payload_str();
// println!("{} - {}", topic, payload_str);
// }
// });
// Define the set of options for the connection
let lwt = mqtt::Message::new("test/lwt", "[LWT] Async subscriber lost connection", 1);
let conn_opts = mqtt::ConnectOptionsBuilder::new_v3()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(true)
.automatic_reconnect(MIN_RECONNECT, MAX_RECONNECT)
.user_name(USER_NAME.to_string())
.password(PASSWORD.to_string())
.will_message(lwt)
.finalize();
// Make the connection to the broker
info!("Connecting to the MQTT server...");
cli.connect_with_callbacks(conn_opts, on_connect_success, on_connect_failure);
cli
}