I can't figure out why my mqtt subcriber gives me StreamError: Closed(..)
without any details.
I get a part of the recieved message and then getting the error.
2024-06-28T12:49:07.915945141+02:00 DEBUG paho_mqtt_c - 20240628 124906.261 m->c->connect_state = 0
2024-06-28T12:49:07.916177041+02:00 DEBUG paho_mqtt_c - 20240628 124906.261 11 my_backend <- PUBLISH msgid: 0 qos: 0 retained: 0 payload len(1172): {"host":"example.com
2024-06-28T12:49:07.916257531+02:00 DEBUG paho_mqtt_c - 20240628 124906.261 Calling messageArrived for client isumis_backend, queue depth 0
2024-06-28T12:49:07.916308994+02:00 DEBUG paho_mqtt::async_client - Message arrived. Client: 0x7fb9f0024160, topic: 0x7fb9f0026c10 len 31 cmsg: 0x7fb9f00240a0: MQTTAsync_message { struct_id: [77, 81, 84, 77], struct_version: 1, payloadlen: 1172, payload: 0x7fb9ec002610, qos: 0, retained: 0, dup: 0, msgid: 0, properties: MQTTProperties { count: 0, max_count: 0, length: 0, array: 0x0 } }
2024-06-28T12:49:07.916444203+02:00 ERROR paho_mqtt::async_client - Stream error: Closed(..)
pub async fn mqtt_connect() -> Result<AsyncClient, paho_mqtt::errors::Error> {
let mqtt_url: String = std::env::var("MQTT_CON").expect("MQTT_CON must be set.");
let host: String = std::env::args()
.nth(1)
.unwrap_or_else(|| mqtt_url);
let id: &str = "isumis_backend";
/*
Create the client. Use an ID for a persistent session.
A real system should try harder to use a unique ID.
*/
let create_opts: CreateOptions = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(id)
.finalize();
// Create the client connection
let cli: AsyncClient = AsyncClient::new(create_opts).unwrap_or_else(|e| {
error!("Error creating the client: {}", e);
process::exit(1);
});
// Define the set of options for the connection
let lwt: Message = Message::new("test", "Async subscriber lost connection", mqtt::QOS_1);
let conn_opts: ConnectOptions = mqtt::ConnectOptionsBuilder::new()
.clean_session(false)
.properties(mqtt::properties![mqtt::PropertyCode::SessionExpiryInterval => 3600])
//.keep_alive_interval(Duration::from_secs(15))
.will_message(lwt)
.finalize();
// Make the connection to the broker
info!("Connecting to the MQTT server...");
cli.connect(conn_opts).await?;
match cli.is_connected() {
true => {
warn!("Connection unsuccessful, reconnecting...");
cli.reconnect();
},
false => info!("Connection to MQTT successful.")
};
Ok(cli)
}
pub fn subscriber() -> () {
if let Err(err) = block_on(async {
let mut cli: AsyncClient = mqtt_connect().await?;
// Get message stream before connecting.
let strm: mqtt::AsyncReceiver<Option<Message>> = cli.get_stream(25);
info!("Subscribing to topics: {:?}", TOPICS);
//let sub_opts: Vec<mqtt::SubscribeOptions> = vec![mqtt::SubscribeOptions::with_retain_as_published(); TOPICS.len()];
//cli.subscribe_many_with_options(TOPICS, QOS, &sub_opts, None)
cli.subscribe_many(TOPICS, QOS).await?;
// Just loop on incoming messages.
info!("Waiting for messages...");
/*
Note that we're not providing a way to cleanly shut down and
disconnect. Therefore, when you kill this app (with a ^C or
whatever) the server will get an unexpected drop and then
should emit the LWT message.
*/
message_loop(cli, strm).await;
// Explicit return type for the async block
Ok::<(), mqtt::Error>(())
}) {
error!("{}", err);
}
}
async fn message_loop(cli: AsyncClient, mut strm: mqtt::AsyncReceiver<Option<Message>>) -> () {
while let Some(msg_opt) = strm.next().await {
if let Some(sub_msg) = msg_opt {
if sub_msg.retained() {
info!("(R) ");
}
let msg: &str = match std::str::from_utf8(sub_msg.payload()) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {e}"),
};
let payload_json: Value = serde_json::from_str(msg).unwrap();
info!("New message: {}", &payload_json);
match sub_msg.topic() {
TRAFFIC => network_traffic(payload_json)
.await
.expect("Could not handle network traffic"),
FILE_INFO => handle_file_info(msg)
.await
.expect("Could not handle scanned app results"),
BLACKLIGHT => handle_blacklight(payload_json)
.await
.expect("Could not handle blacklight results"),
&_ => error!("No suitable topic name found")
};
} else {
// A "None" means we were disconnected. Try to reconnect...
warn!("Lost connection. Attempting reconnect.");
while let Err(err) = cli.reconnect().await {
error!("Error reconnecting: {}", err);
actix_web::rt::time::sleep(Duration::from_millis(1000)).await;
}
}
}
}