Hello,
I am writing a backend for a card game in Rust, using Rocket. Because each card one player deals should be reflected in changes for every other player, I am using websockets.
For each client, a handler function handles all incoming requests. But I also need to actively push messages to each client, if another player / client does something.
So I have set up a data structure in Rockets State:
Arc<Mutex<HashMap<String, Sender<String>>>>
.
That structure should hold the transmit end of a channel to each connected client.
Each client creates a channel:
let (tx, mut rx) = channel(1)
The tx
value it put into the HashMap
, the rx
value is kept in the handler function.
Then I use a select!
to listen
a) for any incoming message from "my" client and
b) for any message arriving at "my" rx
end of the channel.
The problem is, I can send a message into the channel but it does not arrive at the rx
. What am I missing?
Here is a compressed code of the function that handles new websocket connections:
type PeersMap = Arc<Mutex<HashMap<String, Sender<String>>>>;
#[get("/ws/<user>")]
fn mirror(user: String, ws: WebSocket, peers_map: &State<PeersMap>) -> Channel<'static> {
let peers_map = peers_map.inner().clone();
ws.channel(move |mut stream| {
Box::pin(async move {
let (tx, mut rx) = channel(1);
peers_map.lock().await.insert(user.clone(), tx);
loop {
select! {
// handle incoming message from client
message = stream.next() => match message {
Some(Ok(Message::Text(text))) => {
println!("Received message: {:?}", text);
let _ = stream.send(Message::Text(format!("Echo: {}", text))).await;
peers_map.lock().await.iter().for_each(|(peer, tx)| {
if peer != &user {
println!("Sending message to {}: {:?}", peer, text);
let _ = tx.clone().send(format!("{} says: {}", user, text));
}
});
}
// ... handle cases to finish the connection
},
// handle incoming message from the channel
Some(message) = rx.next() => {
println!("Received message from other client: {:?}", message);
let _ = stream.send(Message::Text(message)).await;
},
else => break,
}
}
// cleanup, the connection is closed
peers_map.lock().await.remove(&user);
Ok(())
})
})
}
The function should mirror any incoming message back to the sender and at the same time transmit the message to any other client to be forwarded to them as well – a bit like a chat client. I know, it is far from my card game …
From the print!
statements, I can see that the tx.clone().send(...)
is done. But the select!
-branch Some(message) = rx.next()
is never called.
It seems that the channel does not work. Am I missing something?
I have tried setting the channel capacity to higher values, that doesn't change anything.
Any help is appreciated.
Thanks in advance.