Need some help on some mqtt stuff

Hi

I have some mqtt code for subscribe and publish using paho mqtt rust: https://github.com/eclipse/paho.mqtt.rust

In main I have called the both of the functions but only subscribe functions seems to be working and publish function does execute when I call it. I was wondering what would be the problem and how to fix this issue. Also how can I put this closure:

    // Attach a closure to the client to receive callback
    // on incoming messages.
    cli.set_message_callback(|_cli,msg| {
        if let Some(msg) = msg {
            let topic = msg.topic();
            let payload_str = msg.payload_str();
            println!("{} - {}", topic, payload_str);
        }
    });

into a on_message function so that whenever I subscribe and then publish the topic and and payload is printed via the callback from inside an on_message function.

extern crate log;
extern crate env_logger;
extern crate paho_mqtt as mqtt;

use std::{env, process, thread};
use std::time::Duration;
use futures::Future;

// The topics to which we subscribe.
const TOPICS: &[&str] = &[ "test", "hello" ];
const QOS: &[i32] = &[1, 1];

// Callback for a successful connection to the broker.
// We subscribe to the topic(s) we want here.
fn on_connect_success(cli: &mqtt::AsyncClient, _msgid: u16) {
    println!("Connection succeeded");
    // Subscribe to the desired topic(s).
    cli.subscribe_many(TOPICS, QOS);
    println!("Subscribing to topics: {:?}", TOPICS);
    // TODO: This doesn't yet handle a failed subscription.
}

// Callback for a failed attempt to connect to the server.
// We simply sleep and then try again.
//
// Note that normally we don't want to do a blocking operation or sleep
// from  within a callback. But in this case, we know that the client is
// *not* conected, and thus not doing anything important. So we don't worry
// too much about stopping its callback thread.
fn on_connect_failure(cli: &mqtt::AsyncClient, _msgid: u16, rc: i32) {
    println!("Connection attempt failed with error code {}.\n", rc);
    thread::sleep(Duration::from_millis(2500));
    cli.reconnect_with_callbacks(on_connect_success, on_connect_failure);
}

/////////////////////////////////////////////////////////////////////////////

fn subscribe() {
    // Initialize the logger from the environment
    env_logger::init();

    let host = env::args().nth(1).unwrap_or_else(||
        "test.mosquitto.org".to_string()
    );

    // Create the client. Use an ID for a persistent session.
    // A real system should try harder to use a unique ID.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id("rust_async_subscribe")
        .finalize();

    // Create the client connection
    let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| {
        println!("Error creating the client: {:?}", e);
        process::exit(1);
    });

    // Set a closure to be called whenever the client loses the connection.
    // It will attempt to reconnect, and set up function callbacks to keep
    // retrying until the connection is re-established.
    cli.set_connection_lost_callback(|cli: &mqtt::AsyncClient| {
        println!("Connection lost. Attempting reconnect.");
        thread::sleep(Duration::from_millis(2500));
        cli.reconnect_with_callbacks(on_connect_success, on_connect_failure);
    });

    // Attach a closure to the client to receive callback
    // on incoming messages.
    cli.set_message_callback(|_cli,msg| {
        if let Some(msg) = msg {
            let topic = msg.topic();
            let payload_str = msg.payload_str();
            println!("{} - {}", topic, payload_str);
        }
    });

    // Define the set of options for the connection
    let lwt = mqtt::Message::new("test", "Async subscriber lost connection", 1);

    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .mqtt_version(mqtt::MQTT_VERSION_3_1_1)
        .clean_session(true)
        .will_message(lwt)
        .finalize();

    // Make the connection to the broker
    println!("Connecting to the MQTT server...");
    cli.connect_with_callbacks(conn_opts, on_connect_success, on_connect_failure);

    // Just wait for incoming messages.
    loop {
        thread::sleep(Duration::from_millis(1000));
    }

    // Hitting ^C will exit the app and cause the broker to publish the
    // LWT message since we're not disconnecting cleanly.
}

fn publish() {
    // Initialize the logger from the environment
    env_logger::init();

    // Command-line option(s)
    let host = env::args().nth(1).unwrap_or_else(||
        "test.mosquitto.org".to_string()
    );

    // Create a client to the specified host, no persistence
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .persistence(mqtt::PersistenceType::None)
        .finalize();

    let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {}", err);
        process::exit(1);
    });

    // Connect with default options
    let conn_opts = mqtt::ConnectOptions::new();

    // Create a message and publish it
    println!("Publishing a message on the 'test' topic");
    let msg = mqtt::Message::new("test", "Hello Rust MQTT world!", 0);
    let tok = cli.publish(msg);
}

fn main() {
    subscribe();
    publish();
}

Thanks

Your subscribe never returns because you do a loop {} in it. Nothing after that will ever run.

So do I need to get rid of this loop?

Yes, probably. You should probably also not drop your AsyncClient, because that will destroy everything you set up in the subscribe function. In general, you should understand the code you're trying to write rather than just mashing examples together without any understanding of what they are trying to show you.

I tried getting rid of the loop but the problem is not fixed.

Finished dev [unoptimized + debuginfo] target(s) in 14.47s
 Running `/home/pi/rust/mqtt_test/target/debug/mqtt_test`
Connecting to the MQTT server...
Connection succeeded
Subscribing to topics: ["test", "hello"]
Segmentation fault

I am getting a segmentation fault and I dont what that is or how to fix it so that I print the publish messages.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.