Mpsc over internet, across wasm32/x86

#1
  1. Yes, this can be easily written via websockets. I’m wondering if there is a pre-existing lib before I build my own solution.

  2. I have a 100% Rust app, with foo_client compiled via wasm32, foo_server compiled for native, and both crates importing crate foo_shared

  3. In foo_shared , I have a struct FooMsg. Now, I want a mpsc::channel over FooMsg where the sender is on wasm32, and the receivedr is on native.

  4. I can build this via serde + websockets, but I’m wondering if there is a pre-existing lib that already does this.

0 Likes

#2

There are crates for native to native messages, but for communication between the Web and native you’ll most likely need websockets or something else network-oriented (HTTP, or WebRTC data if you like debugging too much), since the Web sandbox doesn’t have access to any low-level inter-process communication.

serde for this is a very good idea. Serializing messages with Serde will nicely hide architecture differences, and your problem is simplified to just sending a bunch of bytes.

So I don’t know if there’s a crate for this already, but it shoulnd’t be too hard to glue together something like bincode or messagepack and some websocket interface.

1 Like

#3

Here’s what I have so far. Would appreciate any advice from anyone with experience writing these things:

There seems to be a fundamental limitation: without implementing acks / rebuildling TCP over websockets, it seems it’s impossible to know whether a msg was actually delivered.

use super::*;


pub struct WSAutoData<ToClientMsg> {
    rc_ows: RefCell<Option<WebSocket>>,
    url: String,
    sender: mpsc::Sender<ToClientMsg>,
}

#[derive(Clone)]
pub struct WSAuto<ToClientMsg> (Rc<WSAutoData<ToClientMsg>>);


fn create_ws<'a, ToClient>(url: &str, sender: mpsc::Sender<ToClient>) -> Option<WebSocket> where
    ToClient: serde::de::DeserializeOwned + 'static
{

    let ws = WebSocket::new(url).ok()?;

    { let url = url.to_string();
        ws.add_event_listener( move |_: SocketOpenEvent| {
            console!(log, format!("connection opened: {}", url) ); } ); }

    { let url = url.to_string();
        ws.add_event_listener( move |_: SocketErrorEvent| {
            console!(log, format!("connection errored: {}", url) ); } ); }

    { let url = url.to_string();
        ws.add_event_listener( move |event: SocketCloseEvent| {
            console!(log, format!("connection closed: {} : {:?}", url, event.reason())); } ); }

    ws.add_event_listener( move |event: SocketMessageEvent| {
        let data = event.data().into_text().unwrap();
        let to_client: ToClient = serde_json::from_str(&data).unwrap() ;
        sender.send(to_client);
    });

    Some(ws)
}


impl <ToClientMsg> WSAuto<ToClientMsg> where
    ToClientMsg: serde::de::DeserializeOwned + 'static + Clone
{
    pub fn new(url: String, sender: mpsc::Sender<ToClientMsg>) -> WSAuto<ToClientMsg> where
    {
        let ans = WSAuto( Rc::new( WSAutoData {
            rc_ows: RefCell::new(None),
            url: url,
            sender
        } ) );
        ans.force_reconnect();
        ans
    }

    fn get_status(&self) -> Option<SocketReadyState> {
        self.0.rc_ows.borrow().as_ref().and_then(|x| Some(x.ready_state()))
    }

    pub fn is_ready(&self) -> bool {
        match self.0.rc_ows.borrow().as_ref() {
            None => false,
            Some(ws) => ws.ready_state() == SocketReadyState::Open
        }
    }

    fn force_reconnect(&self) where
    {
        let ws = create_ws(&self.0.url, self.0.sender.clone());
        *self.0.rc_ows.borrow_mut() = ws;
        let obj = self.clone();
        stdweb::web::set_timeout(
            move || { if ! obj.is_ready() { obj.force_reconnect(); } },
            5000 );
    }

    pub fn ensure_connected(&self) {
        match self.get_status() {
            Some(SocketReadyState::Open) => {},
            Some(SocketReadyState::Connecting) => {},
            _ => self.force_reconnect()
        }
    }

    pub fn update<ToServerMsg: serde::Serialize>(&self, to_server: &mpsc::Receiver<ToServerMsg> ) {
        if ! self.is_ready() { return; }

        for i in to_server.try_iter() {
            self.0.rc_ows.borrow_mut().as_ref().unwrap().send_text(
                &serde_json::to_string(&i).unwrap()
            );
        }
    }

}
0 Likes

#4

How much do you need delivery guarantee? If only for user experience, then error handling on the websocket interfaces should be enough (bad connection will eventually disconnect and cause send errors).

If you need it for integrity of the data/applications that’s harder: a Byzantine generals problem. You’ll need to figure out semantics of handling communication broken at the worst moment, and handle that in the application.

1 Like

#5

Good point, this is not a websocket problem. This is a byzantine generals problem. I need to rethink design / guarantees.

1 Like