Lapin (+ Tokio): using an existing RabbitMQ channel/connection in the message handler

Hello Rustaceans

I'm a junior software developer learning Rust and I'm currently making a RabbitMQ server with Lapin and Tokio. My code is based on the example code provided by Lapin which can be found here.

I've already managed to connect to and receive messages from the message broker, but I have a problem. Messages that are published on a certain queue expect an RPC call back. For this to work, I need a channel to the message broker so the response can be published on the right queue. It would make sense to use the already existing channel or make a new channel based on the current connection, but this is where I hit a roadblock.

The message handler for a queue in Lapin expects a closure with a static lifetime (line 39-58). This means that the closure can't use variables outside it's scope which aren't statics. As a result of that, I can't use the existing channel or connection with the message broker.

As a temporary workaround, I'm creating a new connection and channel for every RPC call but this is not the way it's supposed to be. Luckily, the temporary workaround is sufficient enough since this is a personal project and will only be used on a very small scale. How can I solve this situation the right way?

I think you may be misunderstanding what a 'static bound means. A 'static closure can still capture values from its environment, they just can't borrow from their (non-'static) environment.

I'm not particularly familiar with RabbitMQ so I kind of had to guess at what specifically you weren't able to do in your code. Modifying the example code to capture channel in the set_delegate closure sounds like approximately what you're trying to do though, so here's that

consumer.set_delegate({
    // Clone the channel so we can move the new instance into the
    // closure and the rest of the code can still use the
    // outer channel variable.
    let channel = channel.clone();

    // Return the closure from this block.
    move |delivery: DeliveryResult| {
        // Create another clone of the channel that can be moved into the future returned by the async move block.
        let channel = channel.clone();
        async move {
            // Do something with the channel instead of dropping it.
            drop(channel);

            let delivery = match delivery {
                // Carries the delivery alongside its channel
                Ok(Some(delivery)) => delivery,
                // The consumer got canceled
                Ok(None) => return,
                // Carries the error and is always followed by Ok(None)
                Err(error) => {
                    dbg!("Failed to consume queue message {}", error);
                    return;
                }
            };

            // Do something with the delivery data (The message payload)

            delivery
                .ack(BasicAckOptions::default())
                .await
                .expect("Failed to ack send_webhook_event message");
        }
    }
});
1 Like

Thank you very much for the fast and clear response.

Yes indeed, that was a misunderstanding by me but this solution doesn't yet solve my problem. A channel isn't cloneable. The same goes for the connection to the message broker.

That code compiled for me and the lapin docs indicate that Channel implements Clone, are you using a different kind of channel?

Oh damn, legit ? I'll try it out later today when I get home and update you then.

Awesome, it works !!!

I've been fiddling around with the code a bit as well to see how it works. I discovered that it's possible to remove the first clone. The code compiles but I'm not sure if I'm overlooking something or if that is fine as well ?

consumer.set_delegate(move |delivery_result: DeliveryResult| {
    let channel: Channel = channel.clone();
    async move {
        // Do something with the channel instead of dropping it.
        drop(channel);
    }
});

Also, my bad on saying that Channel doesn't implement Clone. I didn't have my code in front of me when I said that because I remembered that I got an error when trying to clone the channel. I recreated the error I got in the past and thanks to your solution I (think I) finally understand the problem with my original code (check below the compiler diagnostic).

error: lifetime may not live long enough
  --> src\controllers\rabbitmq.rs:61:70
   |
61 |           consumer.set_delegate(move |delivery_result: DeliveryResult| async {
   |  _______________________________--------------------------------------_^
   | |                               |                                    |
   | |                               |                                    return type of closure `impl Future<Output = ()>` contains a lifetime `'2`
   | |                               lifetime `'1` represents this closure's body
62 | |                 let channel: Channel = channel.clone();
...  |
80 | |             }
81 | |         });
   | |_________^ returning this value requires that `'1` must outlive `'2`
   |
   = note: closure implements `Fn`, so references to captured variables can't escape the closure

Thanks to your solution, I know that the channel needs to be cloned in lifetime '1 first and then lifetime '2 in order for it to be used. But when omitting the clone in lifetime '1, the compiler doesn't have enough info about the lifetime of the 2nd clone in relation to lifetime '1. Therefore it says that the clone should outlive lifetime '1 but since the compiler isn't sure it does, it throws an error. Is this explanation correct ?

Yeah as written you never use the channel again after set_delegate so you can just move the channel into the closure. I decided to assume a real app might need to use the channel again so I cloned it to be safe.

It looks like that version of the code is using a plain async block and not an async move block. So the future created by the block only borrows channel from the closure, which means the closure and future need to have their lifetimes tied together.

In my version channel is cloned from the closure, and then the clone is moved into the future so their are no borrows involved anymore.

Your explanation is close, you would still get an error if you changed it to async move. The error just wouldn't involve lifetimes in that way.

1 Like

Alright, I think I get it now.
Thank you for all the help !!