How can I do forever working async queue consumer

Hi, I'm trying to get messages from queue and do some jobs using by default lapin document example. But there is some problem for me, in that code if there is some error, program panic and closed. I want forever loop. Errors, connection problems etc. should not reason for close to program, if any problem, it sould wait for solution the error and continue to work.

For example; When I restart the queue server or delete/remove and redeclare queue, code must working without close itself.

If there is an error: wait, if error resolved: continue. ( Never panic )

How can I revise this code for working regardless of queue server error.

use futures_executor::LocalPool;
use futures_util::{future::FutureExt, stream::StreamExt, task::LocalSpawnExt};
use lapin::{
    options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
    ConnectionProperties, Result,
};
use std::{thread, time};

fn main() -> Result<()> {

    let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
    let mut executor = LocalPool::new();
    let spawner = executor.spawner();

    executor.run_until(async {
        let conn = Connection::connect(&addr, ConnectionProperties::default()).await?;

        println!("CONNECTED");

        let channel_a = conn.create_channel().await?;
        let channel_b = conn.create_channel().await?;

        let queue = channel_a
            .queue_declare(
                "hello",
                QueueDeclareOptions::default(),
                FieldTable::default(),
            )
            .await?;

        println!("Declared queue {:?}", queue);

        let consumer = channel_b
            .clone()
            .basic_consume(
                "hello",
                "my_consumer",
                BasicConsumeOptions::default(),
                FieldTable::default(),
            )
            .await?;
        let _consumer = spawner.spawn_local(async move {
            println!("will consume");
            consumer
                .for_each(move |delivery| {

                    let delivery = delivery.expect("error caught in in consumer");
                    let result = String::from_utf8_lossy(&delivery.data).to_string();
                    println!("{:?}",result);
                    channel_b
                        .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
                        .map(|_| ())
                })
                .await
        });

        let payload = b"Hello world!";

        loop {
            thread::sleep(time::Duration::from_secs(3));
            let confirm = channel_a
                .basic_publish(
                    "",
                    "hello",
                    BasicPublishOptions::default(),
                    payload.to_vec(),
                    BasicProperties::default(),
                )
                .await?
                .await?;
            assert_eq!(confirm, Confirmation::NotRequested);
        }
    })
}

First of all, you want to get rid of the Result return type on main. All errors bubbling up until main will crash the program.

Secondly you want to handle errors instead of bubbling them up with ? most of the time. Generally that implies matching on the Result.

There are helpers to check whether your code can panic like: no-panic, panic-never and safety-guard.

As for network trouble, you will have to look into either crates that provide resilient networking, eg. automatic reconnect with backoff, or you will have to write that logic yourself.

You want to write unit and integration tests for your error handling! Make sure you trigger all sorts of errors and verify your code recovers from it correctly. Check out this article: Error Handling in a Correctness-Critical Rust Project | sled-rs.github.io for some ideas.

There is surely more to be said about this topic, but this should get you started.

Thank you, I will check your all solution suggestions.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.