Rocket, Websockets and Channels – I recieve nothing


I am writing a backend for a card game in Rust, using Rocket. Because each card one player deals should be reflected in changes for every other player, I am using websockets.

For each client, a handler function handles all incoming requests. But I also need to actively push messages to each client, if another player / client does something.

So I have set up a data structure in Rockets State:
Arc<Mutex<HashMap<String, Sender<String>>>>.

That structure should hold the transmit end of a channel to each connected client.

Each client creates a channel:
let (tx, mut rx) = channel(1)

The tx value it put into the HashMap, the rx value is kept in the handler function.

Then I use a select! to listen
a) for any incoming message from "my" client and
b) for any message arriving at "my" rx end of the channel.

The problem is, I can send a message into the channel but it does not arrive at the rx. What am I missing?

Here is a compressed code of the function that handles new websocket connections:

type PeersMap = Arc<Mutex<HashMap<String, Sender<String>>>>;

fn mirror(user: String, ws: WebSocket, peers_map: &State<PeersMap>) -> Channel<'static> {
    let peers_map = peers_map.inner().clone(); |mut stream| {
        Box::pin(async move {
            let (tx, mut rx) = channel(1);
            peers_map.lock().await.insert(user.clone(), tx);

            loop {
                select! {
                    // handle incoming message from client
                    message = => match message {
                        Some(Ok(Message::Text(text))) => {
                            println!("Received message: {:?}", text);
                            let _ = stream.send(Message::Text(format!("Echo: {}", text))).await;
                            peers_map.lock().await.iter().for_each(|(peer, tx)| {
                                if peer != &user {
                                    println!("Sending message to {}: {:?}", peer, text);
                                    let _ = tx.clone().send(format!("{} says: {}", user, text));
                       // ... handle cases to finish the connection
                    // handle incoming message from the channel
                    Some(message) = => {
                            println!("Received message from other client: {:?}", message);
                            let _ = stream.send(Message::Text(message)).await;
                    else => break,

            // cleanup, the connection is closed

The function should mirror any incoming message back to the sender and at the same time transmit the message to any other client to be forwarded to them as well – a bit like a chat client. I know, it is far from my card game …

From the print! statements, I can see that the tx.clone().send(...) is done. But the select!-branch Some(message) = is never called.

It seems that the channel does not work. Am I missing something?

I have tried setting the channel capacity to higher values, that doesn't change anything.

Any help is appreciated.
Thanks in advance.

Can you provide a reproducible example? The snippet that you provided doesn't seem to be valid code to me (This part: message = => match message {).

Besides that, I find odd your choice of using select! here.

I will attach the complete code of the Rocket backend.

I need select! to wait for two async events. What alternative is there?

extern crate rocket;

use std::collections::HashMap;
use std::sync::Arc;

use rocket::futures::channel::mpsc::{channel, Sender};
use rocket::futures::{SinkExt, StreamExt};
use rocket::tokio::select;
use rocket::tokio::sync::Mutex;
use rocket::{get, launch, routes, State};
use rocket_ws::Message;
use rocket_ws::{Channel, WebSocket};

type PeersMap = Arc<Mutex<HashMap<String, Sender<String>>>>;

fn mirror(user: String, ws: WebSocket, peers_map: &State<PeersMap>) -> Channel<'static> {
    let peers_map = peers_map.inner().clone(); |mut stream| {
        Box::pin(async move {
            let (tx, mut rx) = channel(1);
            peers_map.lock().await.insert(user.clone(), tx);
            let count = peers_map.lock().await.len();
            println!("Connection opened ({} clients)", count);

            loop {
                select! {
                    message = => match message {
                        Some(Ok(Message::Text(text))) => {
                            println!("Received message: {:?}", text);
                            let _ = stream.send(Message::Text(format!("Echo: {}", text))).await;
                            peers_map.lock().await.iter().for_each(|(peer, tx)| {
                                if peer != &user {
                                    println!("Sending message to {}: {:?}", peer, text);
                                    let _ = tx.clone().send(format!("{} says: {}", user, text));
                        Some(Ok(message)) => {
                            println!("Received message from client: {:?}", message);
                            let _ = stream.send(message).await;
                        Some(Err(error)) => {
                            println!("Error: {:?}", error);
                        None => break,
                    Some(message) = => {
                            println!("Received message from other client: {:?}", message);
                            let _ = stream.send(Message::Text(message)).await;
                    else => break,

            let count = peers_map.lock().await.len();
            println!("Connection closed ({} clients)", count);


fn rocket() -> _ {
    let peers_map: PeersMap = Arc::new(Mutex::new(HashMap::new()));
        .mount("/", routes![mirror])

And here is the cargo.toml-file:

name = "backend"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at

rocket = "0.5.0"
rocket_ws = "0.1.0"
serde_json = "1.0.114"

I have also tried to switch to the "normal" channel

use rocket::tokio::sync::mpsc::{channel, Sender};

That did not work either.

Could the cloning of the tx be the problem? I did not find another solution since I need this object mutable and from the State I only get a immutable reference.

I have found the solution. The line:

  let _ = tx.clone().send(format!("{} says: {}", user, text));

returned a Promise and was therefore not executed.
I was not able to just add an await to it because the lambda inside the for_each was not async. But with a little help from ChatGPT, I got this working:

    Some(Ok(Message::Text(text))) => {
    println!("Received message: {:?}", text);
    let _ = stream.send(Message::Text(format!("Echo: {}", text))).await;
        let locked_peers_map = peers_map.lock().await;
        let send_futures = locked_peers_map.iter().filter_map(|(peer, tx)| {
            if peer != &user {
                println!("Sending message to {}: {:?}", peer, text);
                Some(tx.send(format!("{} says: {}", user, text)))
            } else {

        // Await all send operations concurrently
        let _results = join_all(send_futures).await;

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.