Paho_mqtt async

After spending a considerable amount of time reading and studying Rust, I've decided to embark on rewriting some code. I have a Python program that I want to translate. Essentially, it communicates with a device using ModbusTCP and translates the data to MQTT.

Given that both Modbus and MQTT communication need to happen concurrently, I need the connections to be asynchronous. At this juncture, I'm attempting to make MQTT asynchronous without much success. Could I get some assistance?

I need to create a task to handle connection and auto-reconnect. If I understand how to do this, I can then subscribe to a topic.

const BROKER_ADDRESS: &str = "192.168.200.254";
const USER_NAME: &str = "user";
const PASSWORD: &str = "password";
const CLIENT_NAME: &str = "client-name";

use futures::{executor::block_on, stream::StreamExt};
use mqtt::{Client, ConnectOptions};
use paho_mqtt as mqtt;
use std::process;
use std::time::Duration;

fn create_mqtt() -> Client{
    let host: String = "tcp://".to_owned() + BROKER_ADDRESS + ":1883";

    let create_options = mqtt::CreateOptionsBuilder::new()
        .server_uri(&host)
        .client_id(CLIENT_NAME.to_string())
        .finalize();

    // Create a client.
    let cli: Client = mqtt::Client::new(create_options).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // println!("{}", &host);
    cli
}

fn client_opts() -> ConnectOptions{
    mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(false)
        .user_name(USER_NAME.to_string())
        .password(PASSWORD.to_string())
        .finalize()
}

#[tokio::main]
async fn main() {
    let cli: Client = create_mqtt();
    let cli_opts: ConnectOptions = client_opts();
    // Make the connection to the broker

    if cli.is_connected() {
        println!("Connected");
    }else{
        println!("Not connected");
    }

    if let Err(err) = cli.connect(cli_opts) {
        eprintln!("{}", err);
    }
    
    println!("Hello, world!");
}

It would be beneficial to have a function with connection and reconnection capabilities, allowing us to create a non-blocking task in the main loop.

The paho_mqtt only present some examples which are not clear for a beginner. I really appreciate any kind of help.

I got it working, but I'm not sure if its correct:

#[tokio::main]
async fn main() {
    // Initialize the logger from the environment
    env_logger::init();
    info!("Iniciando o programa");

    let cli: AsyncClient = mqtt_func::connect_to_mqtt().await;

    // Just wait for incoming messages.
    let mut k:u32 = 0;
    loop {
        thread::sleep(Duration::from_millis(1000));
        k = heartbeat(k);

        if k % 10 == 0 {
            if !cli.is_connected() {
                warn!("Can't publish. Not connected")
            } else{
                let msg: mqtt::Message = mqtt::Message::new("test", "Hello Rust MQTT world!", 0);
                info!("Publishing");
                let _tok = cli.publish(msg);
            }
            
        }
    }

}
pub async fn connect_to_mqtt() -> mqtt::AsyncClient{
    let mut rng = rand::thread_rng();
    let random_number: u32 = rng.gen_range(0..=100);
    // info!("Random number: {}", random_number);

    let host: String = "tcp://".to_owned() + BROKER_ADDRESS + ":1883";
    let create_options = mqtt::CreateOptionsBuilder::new()
        .server_uri(&host)
        .client_id(CLIENT_NAME.to_string() + &random_number.to_string())
        .finalize();
    
    // Create a client.
    let cli: AsyncClient = mqtt::AsyncClient::new(create_options).unwrap_or_else(|err| {
        info!("Error creating the client: {:?}", err);
        process::exit(1);
    });

    // Set a closure to be called whenever the client connection is established.
    // This is called after the initial connection and also after successful
    // reconnections.
    cli.set_connected_callback(|_| {
        info!("Connected.");
    });

    // Set a closure to be called whenever the client loses the connection.
    // It just reports the state to the user, and lets the library try to
    // reconnect.
    cli.set_connection_lost_callback(|_| {
        info!("Connection lost. Attempting reconnect...");
    });

    // Attach a closure to the client to receive callback
    // on incoming messages.
    cli.set_message_callback(on_message);
    // cli.set_message_callback(|_cli, msg| {
    //     if let Some(msg) = msg {
    //         let topic = msg.topic();
    //         let payload_str = msg.payload_str();
    //         println!("{} - {}", topic, payload_str);
    //     }
    // });

    // Define the set of options for the connection
    let lwt = mqtt::Message::new("test/lwt", "[LWT] Async subscriber lost connection", 1);

    let conn_opts = mqtt::ConnectOptionsBuilder::new_v3()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(true)
        .automatic_reconnect(MIN_RECONNECT, MAX_RECONNECT)
        .user_name(USER_NAME.to_string())
        .password(PASSWORD.to_string())
        .will_message(lwt)
        .finalize();

    // Make the connection to the broker
    info!("Connecting to the MQTT server...");
    cli.connect_with_callbacks(conn_opts, on_connect_success, on_connect_failure);
    cli

}