I'm getting 2 redis subscription messages from async tokio spawn

I'm making a messaging app using rust socketIoxide and redisrs with nextjs,

to handle messages to different servers I'm using Redis pub/sub mechanism with async code but when I send a message it gets published correctly to Redis instance while subscribing I get 2 messages from async tokio pull

The following code is responsible for handling subscription messages

pub async fn redis_subscription_handler(
    redis: Arc<Mutex<Redis>>,
    socket: SocketRef,
    delay: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
    let redis_guard = redis.lock().await;
    let mut pubsub = redis_guard
        .client
        .get_tokio_connection()
        .await
        .unwrap()
        .into_pubsub();
    let _ = pubsub.subscribe("Messages").await;

    // Spawn an asynchronous task
    tokio::spawn(async move {
        // Inside the spawned task, use a loop to continuously process messages
        while let Some(msg) = pubsub.on_message().next().await {
            // Parse the payload
            let payload: Value = match serde_json::from_str(&msg.get_payload::<String>().unwrap()) {
                Ok(payload) => payload,
                Err(err) => {
                    error!("Error parsing JSON payload: {}", err);
                    continue; // Skip to the next iteration if parsing fails
                }
            };
            info!("Received message: {:?}", payload);

            // Extract room_id from payload
            let room_id = match payload["room_id"].as_str() {
                Some(room_id) => room_id.to_string(),
                None => {
                    error!("Error: 'room_id' field missing in payload");
                    continue; // Skip to the next iteration if 'room_id' is missing
                }
            };

            // Emit the message to the socket
            socket.within(room_id).emit("message", payload).unwrap();

            // Introduce a delay between processing each message
            tokio::time::sleep(delay).await;
        }
    });

    Ok(())
}

this is how the response looks like

here is full code
Thanks for the help

My first guess would be that you have multiple subscriptions running at the same. I.e. you execute redis_subscription_handler multiple times, spawning a new task each time. To test that I'd create a random number in the task and add that to the log message. Like:

pub async fn redis_subscription_handler(
    redis: Arc<Mutex<Redis>>,
    socket: SocketRef,
    delay: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
    let redis_guard = redis.lock().await;
    let mut pubsub = redis_guard
        .client
        .get_tokio_connection()
        .await
        .unwrap()
        .into_pubsub();
    let _ = pubsub.subscribe("Messages").await;

    // Spawn an asynchronous task
    tokio::spawn(async move {
+       let task_id: u128 = rand::random();

        // Inside the spawned task, use a loop to continuously process messages
        while let Some(msg) = pubsub.on_message().next().await {
            // Parse the payload
            let payload: Value = match serde_json::from_str(&msg.get_payload::<String>().unwrap()) {
                Ok(payload) => payload,
                Err(err) => {
                    error!("Error parsing JSON payload: {}", err);
                    continue; // Skip to the next iteration if parsing fails
                }
            };
-           info!("Received message: {:?}", payload);
+           info!("Task {task_id} received message: {:?}", payload);

            // Extract room_id from payload
            let room_id = match payload["room_id"].as_str() {
                Some(room_id) => room_id.to_string(),
                None => {
                    error!("Error: 'room_id' field missing in payload");
                    continue; // Skip to the next iteration if 'room_id' is missing
                }
            };

            // Emit the message to the socket
            socket.within(room_id).emit("message", payload).unwrap();

            // Introduce a delay between processing each message
            tokio::time::sleep(delay).await;
        }
    });

    Ok(())
}

I tried yes it got called two times,
I also tried to work on a single connection but can't take to a mutable connection and the crate doesn't have copy trait tried to use Mutex and Arc but that didn't work either then this got me working but it produced two messages.

can you help me to get it around

So I'm right? You have two connections open, both receiving messages from your Messages Redis topic? If that's the case I believe the correct way to fix this is to refactor your code such that you only ever run the Redis subscriber once in your program. I don't know your codebase, but I'd assume it would make sense to spawn the listener in your main function and provide it with the sockets you want to relay your messages to either via an mpsc channel[1] or via shared memory (i.e. Arc<Mutex<Vec<SocketRef>>>).


  1. You could use select! in a loop to listen to both the mpsc channel and your redis messages simultaneously ↩︎

Yes sure, I will try.

You can try to go through my codebase

Hey,

Can you connect,

I guess it's not getting me around from 5 to 6 days I'm only stuck on this issue

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.