I'm making a messaging app using rust socketIoxide and redisrs with nextjs,
to handle messages to different servers I'm using Redis pub/sub mechanism with async code but when I send a message it gets published correctly to Redis instance while subscribing I get 2 messages from async tokio pull
The following code is responsible for handling subscription messages
pub async fn redis_subscription_handler(
redis: Arc<Mutex<Redis>>,
socket: SocketRef,
delay: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
let redis_guard = redis.lock().await;
let mut pubsub = redis_guard
.client
.get_tokio_connection()
.await
.unwrap()
.into_pubsub();
let _ = pubsub.subscribe("Messages").await;
// Spawn an asynchronous task
tokio::spawn(async move {
// Inside the spawned task, use a loop to continuously process messages
while let Some(msg) = pubsub.on_message().next().await {
// Parse the payload
let payload: Value = match serde_json::from_str(&msg.get_payload::<String>().unwrap()) {
Ok(payload) => payload,
Err(err) => {
error!("Error parsing JSON payload: {}", err);
continue; // Skip to the next iteration if parsing fails
}
};
info!("Received message: {:?}", payload);
// Extract room_id from payload
let room_id = match payload["room_id"].as_str() {
Some(room_id) => room_id.to_string(),
None => {
error!("Error: 'room_id' field missing in payload");
continue; // Skip to the next iteration if 'room_id' is missing
}
};
// Emit the message to the socket
socket.within(room_id).emit("message", payload).unwrap();
// Introduce a delay between processing each message
tokio::time::sleep(delay).await;
}
});
Ok(())
}
My first guess would be that you have multiple subscriptions running at the same. I.e. you execute redis_subscription_handler multiple times, spawning a new task each time. To test that I'd create a random number in the task and add that to the log message. Like:
pub async fn redis_subscription_handler(
redis: Arc<Mutex<Redis>>,
socket: SocketRef,
delay: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
let redis_guard = redis.lock().await;
let mut pubsub = redis_guard
.client
.get_tokio_connection()
.await
.unwrap()
.into_pubsub();
let _ = pubsub.subscribe("Messages").await;
// Spawn an asynchronous task
tokio::spawn(async move {
+ let task_id: u128 = rand::random();
// Inside the spawned task, use a loop to continuously process messages
while let Some(msg) = pubsub.on_message().next().await {
// Parse the payload
let payload: Value = match serde_json::from_str(&msg.get_payload::<String>().unwrap()) {
Ok(payload) => payload,
Err(err) => {
error!("Error parsing JSON payload: {}", err);
continue; // Skip to the next iteration if parsing fails
}
};
- info!("Received message: {:?}", payload);
+ info!("Task {task_id} received message: {:?}", payload);
// Extract room_id from payload
let room_id = match payload["room_id"].as_str() {
Some(room_id) => room_id.to_string(),
None => {
error!("Error: 'room_id' field missing in payload");
continue; // Skip to the next iteration if 'room_id' is missing
}
};
// Emit the message to the socket
socket.within(room_id).emit("message", payload).unwrap();
// Introduce a delay between processing each message
tokio::time::sleep(delay).await;
}
});
Ok(())
}
I tried yes it got called two times,
I also tried to work on a single connection but can't take to a mutable connection and the crate doesn't have copy trait tried to use Mutex and Arc but that didn't work either then this got me working but it produced two messages.
So I'm right? You have two connections open, both receiving messages from your Messages Redis topic? If that's the case I believe the correct way to fix this is to refactor your code such that you only ever run the Redis subscriber once in your program. I don't know your codebase, but I'd assume it would make sense to spawn the listener in your main function and provide it with the sockets you want to relay your messages to either via an mpsc channel[1] or via shared memory (i.e. Arc<Mutex<Vec<SocketRef>>>).
You could use select! in a loop to listen to both the mpsc channel and your redis messages simultaneously ↩︎