Calling async functions for ampqrs

I'm new in Rust and I am struggling a little bit with async functions. The goal of my program is to send messages to a RabbitMQ queue using amqprs.

I have defined two functions, one to get the channel and another one which actually send the message:

    //get the channel
    async fn get_amqp_channel() -> Channel {
        let connection_arguments = OpenConnectionArguments::new(RABBIT_SERVER_URL, PORT, USER, PASSWORD);
        let connection = Connection::open(&connection_arguments).await.unwrap();
        return connection.open_channel(None).await.unwrap();
    }

    //send the message
    async fn send_amqp_message(channel: &Channel, routing_key: &str, message: String) {
        let publish_arguments = BasicPublishArguments::new(EXCHANGE, routing_key);
        channel.basic_publish(BasicProperties::default(), message.into_bytes(), publish_arguments).await.unwrap();
    }

If I call them from a async function like this, the message is never sent (no panic either):

    fn send_command() {
        //build message
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(send_message(message_type, serde_json::to_string(&message).unwrap()));
    }

    async fn send_message(message_type : String, message : String) {
        let channel = get_amqp_channel().await;
        send_amqp_message(&channel, get_routing_key(message_type).as_str(), message).await;
    }

But, if I combine both functions into a single one, then everything works fine:

    async fn send_message(message_type : String, message : String) {
        //get_amqp_channel
        let connection_arguments = OpenConnectionArguments::new(RABBIT_DS_URL, PORT, USER, PASSWORD);
        let connection = Connection::open(&connection_arguments).await.unwrap();
        let channel = connection.open_channel(None).await.unwrap();
        
        //send_amqp_message
        let publish_arguments = BasicPublishArguments::new(EXCHANGE, get_routing_key(message_type).as_str());
        channel.basic_publish(BasicProperties::default(), message.into_bytes(), publish_arguments).await.unwrap();
    }

So, what's the difference between splitting this code in two funcionts?

As far as I understand, it shouldn't be any difference, since the block_on call forces the whole program to wait for the future to end. I'm pretty sure the answer would be obvious, but I've been struggling with this for a while.

Any help would be welcome.

Thank you very much!!.

Isn't it a problem that you are creating a different runtime in the first case?

I think I'm only creating a single runtime in the parent function (there's no other runtime creation):

let rt = tokio::runtime::Runtime::new().unwrap();

What I don't understand is the difference between two async functions and a single one with the same code.

it shouldn't have made any differences. the problem must be somewhere else.

you might have not explicitly created other runtime, but there's a concept called "current runtime context" in tokio, if you ever used the #[tokio::main] attrbiute or some of the functions under tokio::task module (e.g. tokio::task::block_in_place(), or tokio:spawn(), which is an alias of tokio::task::spawn()), you probably have extra runtime than you think.

1 Like

Finally, I have found the problem: in the get_amqp_channel function, a Channel struct is returned, which has a Connection element inside. When the control returns to the main function, the connection is not longer active, so when the time for sending the message comes, there's no connection linked to this channel (channel.is_connection_open() == false).

It seems to me that connection is automatically closed whenever the connection variable is out of scope, despite of having the channel active.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.