Rocket, Websockets and Channels – I recieve nothing

Hello,

I am writing a backend for a card game in Rust, using Rocket. Because each card one player deals should be reflected in changes for every other player, I am using websockets.

For each client, a handler function handles all incoming requests. But I also need to actively push messages to each client, if another player / client does something.

So I have set up a data structure in Rockets State:
Arc<Mutex<HashMap<String, Sender<String>>>>.

That structure should hold the transmit end of a channel to each connected client.

Each client creates a channel:
let (tx, mut rx) = channel(1)

The tx value it put into the HashMap, the rx value is kept in the handler function.

Then I use a select! to listen
a) for any incoming message from "my" client and
b) for any message arriving at "my" rx end of the channel.

The problem is, I can send a message into the channel but it does not arrive at the rx. What am I missing?

Here is a compressed code of the function that handles new websocket connections:

type PeersMap = Arc<Mutex<HashMap<String, Sender<String>>>>;

#[get("/ws/<user>")]
fn mirror(user: String, ws: WebSocket, peers_map: &State<PeersMap>) -> Channel<'static> {
    let peers_map = peers_map.inner().clone();
    ws.channel(move |mut stream| {
        Box::pin(async move {
            let (tx, mut rx) = channel(1);
            peers_map.lock().await.insert(user.clone(), tx);

            loop {
                select! {
                    // handle incoming message from client
                    message = stream.next() => match message {
                        Some(Ok(Message::Text(text))) => {
                            println!("Received message: {:?}", text);
                            let _ = stream.send(Message::Text(format!("Echo: {}", text))).await;
                            peers_map.lock().await.iter().for_each(|(peer, tx)| {
                                if peer != &user {
                                    println!("Sending message to {}: {:?}", peer, text);
                                    let _ = tx.clone().send(format!("{} says: {}", user, text));
                                }
                            });
                        }
                       // ... handle cases to finish the connection
                    },
                    // handle incoming message from the channel
                    Some(message) = rx.next() => {
                            println!("Received message from other client: {:?}", message);
                            let _ = stream.send(Message::Text(message)).await;
                    },
                    else => break,
                }
            }

            // cleanup, the connection is closed
            peers_map.lock().await.remove(&user);
            Ok(())
        })
    })
}

The function should mirror any incoming message back to the sender and at the same time transmit the message to any other client to be forwarded to them as well – a bit like a chat client. I know, it is far from my card game …

From the print! statements, I can see that the tx.clone().send(...) is done. But the select!-branch Some(message) = rx.next() is never called.

It seems that the channel does not work. Am I missing something?

I have tried setting the channel capacity to higher values, that doesn't change anything.

Any help is appreciated.
Thanks in advance.

Can you provide a reproducible example? The snippet that you provided doesn't seem to be valid code to me (This part: message = stream.next() => match message {).

Besides that, I find odd your choice of using select! here.

I will attach the complete code of the Rocket backend.

I need select! to wait for two async events. What alternative is there?

extern crate rocket;

use std::collections::HashMap;
use std::sync::Arc;

use rocket::futures::channel::mpsc::{channel, Sender};
use rocket::futures::{SinkExt, StreamExt};
use rocket::tokio::select;
use rocket::tokio::sync::Mutex;
use rocket::{get, launch, routes, State};
use rocket_ws::Message;
use rocket_ws::{Channel, WebSocket};

type PeersMap = Arc<Mutex<HashMap<String, Sender<String>>>>;

#[get("/ws/<user>")]
fn mirror(user: String, ws: WebSocket, peers_map: &State<PeersMap>) -> Channel<'static> {
    let peers_map = peers_map.inner().clone();
    ws.channel(move |mut stream| {
        Box::pin(async move {
            let (tx, mut rx) = channel(1);
            peers_map.lock().await.insert(user.clone(), tx);
            let count = peers_map.lock().await.len();
            println!("Connection opened ({} clients)", count);

            loop {
                select! {
                    message = stream.next() => match message {
                        Some(Ok(Message::Text(text))) => {
                            println!("Received message: {:?}", text);
                            let _ = stream.send(Message::Text(format!("Echo: {}", text))).await;
                            peers_map.lock().await.iter().for_each(|(peer, tx)| {
                                if peer != &user {
                                    println!("Sending message to {}: {:?}", peer, text);
                                    let _ = tx.clone().send(format!("{} says: {}", user, text));
                                }
                            });
                        }
                        Some(Ok(message)) => {
                            println!("Received message from client: {:?}", message);
                            let _ = stream.send(message).await;
                        }
                        Some(Err(error)) => {
                            println!("Error: {:?}", error);
                            break;
                        }
                        None => break,
                    },
                    Some(message) = rx.next() => {
                            println!("Received message from other client: {:?}", message);
                            let _ = stream.send(Message::Text(message)).await;
                    },
                    else => break,
                }
            }

            peers_map.lock().await.remove(&user);
            let count = peers_map.lock().await.len();
            println!("Connection closed ({} clients)", count);

            Ok(())
        })
    })
}

#[launch]
fn rocket() -> _ {
    let peers_map: PeersMap = Arc::new(Mutex::new(HashMap::new()));
    rocket::build()
        .manage(peers_map)
        .mount("/", routes![mirror])
}

And here is the cargo.toml-file:

[package]
name = "backend"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rocket = "0.5.0"
rocket_ws = "0.1.0"
serde_json = "1.0.114"

I have also tried to switch to the "normal" channel

use rocket::tokio::sync::mpsc::{channel, Sender};

That did not work either.

Could the cloning of the tx be the problem? I did not find another solution since I need this object mutable and from the State I only get a immutable reference.

I have found the solution. The line:

  let _ = tx.clone().send(format!("{} says: {}", user, text));

returned a Promise and was therefore not executed.
I was not able to just add an await to it because the lambda inside the for_each was not async. But with a little help from ChatGPT, I got this working:

    Some(Ok(Message::Text(text))) => {
    println!("Received message: {:?}", text);
    let _ = stream.send(Message::Text(format!("Echo: {}", text))).await;
        let locked_peers_map = peers_map.lock().await;
        let send_futures = locked_peers_map.iter().filter_map(|(peer, tx)| {
            if peer != &user {
                println!("Sending message to {}: {:?}", peer, text);
                Some(tx.send(format!("{} says: {}", user, text)))
            } else {
                None
            }
        }).collect::<Vec<_>>();

        // Await all send operations concurrently
        let _results = join_all(send_futures).await;
    }

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.