Is it possible to share the websocket TX & RX to route filter

Hi
Is it possible to share the websocket TX & RX to route filter

warp::path::end()
.and(warp::ws())
.map(|ws: Ws| ws.on_upgrade(move |websocket: WebSocket| ws_fn(websocket)))
.and(warp::get())
.and_then(route1)

Something like -> javascript - Emiting websocket message from routes - Stack Overflow

So basically if route1 or route2 has something to emit, then how it can be possible?

If you want to setup something like that stack overflow question you need

  1. A route that creates new websockets subscriptions, and stores them in some shared state
  2. To access that shared state in the other routes to send messages to the subscribed connections

Your shared state can be something like an Arc<Mutex<Vec<Ws>>> if you're just sending a message to all connected users.

1 Like

Thanks
Can you please give me a small short example or snippet?

Not exactly elegant, but hopefully it gets the point across

use futures::{SinkExt, StreamExt};
use std::{convert::Infallible, sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tokio_tungstenite::connect_async;
use warp::{
    hyper::StatusCode,
    ws::{Message, Ws},
    Filter,
};

#[tokio::main]
async fn main() {
    let state = Arc::new(Mutex::new(Vec::new()));
    let server = warp::path("subscribe")
        .and(warp::ws())
        .map({
            let state = state.clone();
            move |ws: Ws| {
                ws.on_upgrade({
                    let state = state.clone();
                    move |ws| async move {
                        state.lock().await.push(ws);
                    }
                })
            }
        })
        .or(warp::path!("publish" / String).and(warp::post()).and_then({
            let state = state.clone();
            move |message| {
                let state = state.clone();
                async move {
                    let mut guard = state.lock().await;

                    for conn in &mut *guard {
                        conn.send(Message::text(&message)).await.unwrap();
                    }

                    Ok::<_, Infallible>(StatusCode::OK)
                }
            }
        }));

    tokio::spawn(run_subscribe());
    tokio::spawn(run_send());

    warp::serve(server).run(([127, 0, 0, 1], 3030)).await;
}

async fn run_subscribe() {
    let (mut stream, _) = connect_async("ws://localhost:3030/subscribe")
        .await
        .unwrap();

    while let Some(message) = stream.next().await {
        let message = message.unwrap();

        println!("{message:?}")
    }
}

async fn run_send() {
    let client = reqwest::Client::new();

    for i in 0..10 {
        tokio::time::sleep(Duration::from_secs(1)).await;
        client
            .post(format!("http://localhost:3030/publish/Message{i}"))
            .send()
            .await
            .unwrap();
    }
}

also needs some extra dependencies

futures = "0.3.25"
reqwest = "0.11.13"
tokio-tungstenite = "0.18.0"
tokio = { version = "1.24.1", features = ["full"] }
warp = "0.3.3"

Should output something along the lines of

Text("Message1")
Text("Message2")
Text("Message3")
Text("Message4")
Text("Message5")
Text("Message6")
Text("Message7")
Text("Message8")
Text("Message9")

Note that the run_subscribe and run_send functions might fail semi randomly since there's nothing enforcing that the server is up before they run. It worked on my machine but you wouldn't want to leave it like that in production code.

1 Like

Thank you so much :heart:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.