Hi, I'm trying to get messages from queue and do some jobs using by default lapin document example. But there is some problem for me, in that code if there is some error, program panic and closed. I want forever loop. Errors, connection problems etc. should not reason for close to program, if any problem, it sould wait for solution the error and continue to work.
For example; When I restart the queue server or delete/remove and redeclare queue, code must working without close itself.
If there is an error: wait, if error resolved: continue. ( Never panic )
How can I revise this code for working regardless of queue server error.
use futures_executor::LocalPool;
use futures_util::{future::FutureExt, stream::StreamExt, task::LocalSpawnExt};
use lapin::{
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
ConnectionProperties, Result,
};
use std::{thread, time};
fn main() -> Result<()> {
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let mut executor = LocalPool::new();
let spawner = executor.spawner();
executor.run_until(async {
let conn = Connection::connect(&addr, ConnectionProperties::default()).await?;
println!("CONNECTED");
let channel_a = conn.create_channel().await?;
let channel_b = conn.create_channel().await?;
let queue = channel_a
.queue_declare(
"hello",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
println!("Declared queue {:?}", queue);
let consumer = channel_b
.clone()
.basic_consume(
"hello",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
let _consumer = spawner.spawn_local(async move {
println!("will consume");
consumer
.for_each(move |delivery| {
let delivery = delivery.expect("error caught in in consumer");
let result = String::from_utf8_lossy(&delivery.data).to_string();
println!("{:?}",result);
channel_b
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
.map(|_| ())
})
.await
});
let payload = b"Hello world!";
loop {
thread::sleep(time::Duration::from_secs(3));
let confirm = channel_a
.basic_publish(
"",
"hello",
BasicPublishOptions::default(),
payload.to_vec(),
BasicProperties::default(),
)
.await?
.await?;
assert_eq!(confirm, Confirmation::NotRequested);
}
})
}