How do I save warp::tx in a struct as a instance

Hi, I am very new to Rust and as a newbie I copied over this code from the warp example. What I am trying to do is save an instance of connected clients in the instance clients, so that I can unwrap it and call send on the tx from a different implementation. I am sure I am completely butchering how this is to be done, so please if you have a better suggestion, I would really appreciate it.

    #[derive(Debug, Clone)]
    pub(crate) struct WebSocketServer {
        client_id: Option<usize>,
        clients: Option<Arc<RwLock<HashMap<usize, UnboundedSender<Message>>>>>,
    }

    impl WebSocketServer {
        pub async fn wsc(&mut self) {
            let clients = Clients::default();
            self.clients = Some(clients.clone());
            let clients = warp::any().map(move || clients.clone());
            let client_id = CLIENT_ID.fetch_add(1, Ordering::Relaxed);
            self.client_id = Some(client_id);

            let routes = warp::path("ws").and(warp::ws()).and(clients).map(
                move |ws: warp::ws::Ws,
                      clients: Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>| {
                    ws.on_upgrade( move |socket| {
                        self.on_client_connect(client_id, socket, clients);
                    })
                },
            );
            println!("- WebSocket Server Started");
            warp::serve(routes).run(([127, 0, 0, 1], 3900)).await;
        }

        pub async fn send_message_to_client(&self) {
            println!("clone clients {:?}", self.clients.clone().unwrap());
            if self.clients.is_some() {
                let clients = self.clients.clone().unwrap();
                let tx = clients.read().await;
                let value = tx.get(&self.client_id.unwrap());
                let txx = value.unwrap();
                let _ = txx.send(Message::text("hello from server"));
            } else {
                println!("Empty clients");
            }
        }

        async fn on_client_connect(&self, id: usize, ws: WebSocket, clients: Clients) {
            eprintln!("Connection received {} ", id);

            let (mut client_ws_tx, mut client_wx_rx) = ws.split();

            let (tx, rx) = mpsc::unbounded_channel();
            let mut rx: UnboundedReceiverStream<Message> = UnboundedReceiverStream::new(rx);

            tokio::task::spawn(async move {
                while let Some(message) = rx.next().await {
                    client_ws_tx
                        .send(message)
                        .unwrap_or_else(|e| {
                            eprintln!("websocket send error {}", e);
                        })
                        .await;
                }
            });

            let _ = tx.send(Message::text("Ping from Server"));

            clients.write().await.insert(id, tx);

            let clone_clients = clients.clone();
            let clients = clone_clients;
            let tx = clients.read().await;
            let id: usize = 1;
            let value = tx.get(&id);
            println!("Values {:?}", value);

            while let Some(result) = client_wx_rx.next().await {
                match result {
                    Ok(mess) => {
                        let p = "Message received";

                        eprintln!("{} {:?} ", p, mess);
                    }
                    Err(e) => {
                        eprintln!("WebSocket error(uid={}): {}", id, e);
                        break;
                    }
                };
            }
        }

Error in Cargo Build

error[E0525]: expected a closure that implements the `Fn` trait, but this closure only implements `FnOnce`
   --> src/ws.rs:66:17
    |
65  |                let routes = warp::path("ws").and(warp::ws()).and(clients).map(
    |                                                                           --- required by a bound introduced by this call
66  | //                 move |ws: warp::ws::Ws,
67  | ||                       clients: Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>...
    | ||___________________________________________________________________________________________^ this closure implements `FnOnce`, not `Fn`
68  | |                      ws.on_upgrade( move |socket| {
69  | |                          self.on_client_connect(client_id, socket, clients)
    | |                          ---- closure is `FnOnce` because it moves the variable `self` out of its environment
70  | |                      })
71  | |                  },
    | |__________________- the requirement to implement `Fn` derives from here
    |
    = note: required for `{closure@src/ws.rs:66:17: 67:92}` to implement `warp::generic::Func<(Ws, std::sync::Arc<tokio::sync::RwLock<HashMap<usize, tokio::sync::mpsc::UnboundedSender<Message>>>>)>`
note: required by a bound in `warp::Filter::map`
   --> registry/src/index.crates.io-6f17d22bba15001f/warp-0.3.6/src/filter/mod.rs:194:12
    |
191 |     fn map<F>(self, fun: F) -> Map<Self, F>
    |        --- required by a bound in this associated function
...
194 |         F: Func<Self::Extract> + Clone,
    |            ^^^^^^^^^^^^^^^^^^^ required by this bound in `Filter::map`

error[E0277]: the trait bound `&mut WebSocketServer: Clone` is not satisfied in `{closure@src/ws.rs:66:17: 67:92}`
   --> src/ws.rs:66:17
    |
65  |                let routes = warp::path("ws").and(warp::ws()).and(clients).map(
    |                                                                           --- required by a bound introduced by this call
66  | //                 move |ws: warp::ws::Ws,
67  | ||                       clients: Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>...
    | ||___________________________________________________________________________________________- within this `{closure@src/ws.rs:66:17: 67:92}`
68  | |                      ws.on_upgrade( move |socket| {
69  | |                          self.on_client_connect(client_id, socket, clients)
70  | |                      })
71  | |                  },
    | |__________________^ within `{closure@src/ws.rs:66:17: 67:92}`, the trait `Clone` is not implemented for `&mut WebSocketServer`
    |
    = help: the trait `Clone` is implemented for `WebSocketServer`
    = note: `Clone` is implemented for `&WebSocketServer`, but not for `&mut WebSocketServer`
note: required because it's used within this closure
   --> src/ws.rs:66:17
    |
66  | / ...   move |ws: warp::ws::Ws,
67  | | ...         clients: Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>| {
    | |_________________________________________________________________________________^
note: required by a bound in `warp::Filter::map`
   --> registry/src/index.crates.io-6f17d22bba15001f/warp-0.3.6/src/filter/mod.rs:194:34
    |
191 |     fn map<F>(self, fun: F) -> Map<Self, F>
    |        --- required by a bound in this associated function
...
194 |         F: Func<Self::Extract> + Clone,
    |                                  ^^^^^ required by this bound in `Filter::map`

warning: unused import: `futures::FutureExt`
  --> src/ws.rs:10:9
   |
10 |     use futures::FutureExt;
   |         ^^^^^^^^^^^^^^^^^^

Hi, welcome! One thing that could help others diagnose this is to post the entire error message from running cargo build in the terminal. Rust errors have a lot of info in them, including showing where borrowing occurs, and sometimes helpful hints. Not all this info is shown in IDEs, or at least not all at the same time for easy copying.

1 Like

Thank you, I edited the post with cargo build errors. If there are concepts that I need to understand, please do recommend as well.

1 Like

Let's tackle the first error first:

What the compiler error is telling you is that here you are moving self (that's what the move keyword means):

---- closure is `FnOnce` because it moves the variable `self` out of its environment

One idea on how to fix this is to make on_client_connect a standalone function rather than a method, so that self does not get captured in the closure.

hi @moy2010,

I had on_client_connect as an associated function, and that got everything running fine. However, the main issue I faced was, even though I set the clients to self self.clients = Some(clients.clone()); and I thought this would create a pointer to the same object in memory.

After I set the clients to self, I then pass the clients to self.on_client_connect(client_id, socket, clients);, And this function adds clients.write().await.insert(id, tx); the newly connected clients to the data structure. And right after I add, I do a println!("Values {:?}", value); to verify if a record is added to the datastructure.

However, when I call send_message_to_client from a different implementation something like wss.send_message_to_client(), the self.clients.is_some() is always None, even though I just verified in the print statement that it was added to the data structure.

And, I really appreciate everyone time helping a newbie.

Can you share how is Clients defined?

Here I see a lot of "Clients` clones:

let clients = Clients::default();
self.clients = Some(clients.clone());
let clients = warp::any().map(move || clients.clone());

If, let's say, Clients is not behind a smart pointer such as Arc then everytime that you clone it you'll get a new instance.

1 Like

Ah! thank you, learnt something about Arc now.

Clients definition: To start with I just wanted 1 client to connect even though its using a HashMap.

 #[derive(Debug, Clone)]
    pub(crate) struct WebSocketServer {
        client_id: Option<usize>,
        clients: Option<Arc<RwLock<HashMap<usize, UnboundedSender<Message>>>>>,
    }

@moy2010 you gave a really good hint about Arc and pointing me to a right direction, thank you again. To at least solve my issue, I created a shared_clients with Arc, and have WebSocketServer share the same data behind the pointer.

    let shared_clients = Arc::new(RwLock::new(
        HashMap::<usize, UnboundedSender<Message>>::new(),
    ));
    wss_server.set_shared_clients(shared_clients);
1 Like