rust websocket client send while receiving

I am new to rust, and I am recently working on transferring a project from python to rust.

In the design, I will need an object that

  1. Receive data from a websocket in a dedicated thread
  2. Send data with the same websocket in some other thread

of course, atomic protection on websocket is favorable. However I have some problem in expressing my idea in rust.

I have naively attempted a solution. Let me share a pseudo-rust code here to demonstrate my idea:

struct WebsocketConnection {
    socket: Option<Websocket>,
}
impl WebsocketConnection {
    fn New() -> Self {
        Self{socket: None}
    }
    fn connect(&mut self, url: String) {
        let wss = Websocket::new(url).async_connect_secure(None).wait().unwrap();
        self.socket = wss
    }
    fn send_msg(&mut self, msg: Message) {
        self.socket.write_message(msg)
    }
    fn read_msg(&mut self) -> Message {
        return self.socket.read_message()
    }
    fn dispatch_msg(&mut self, process_msg: fn(Message)) {
        loop {
            let msg = self.read_msg();
            process_msg(msg);
        }
    }
}

I find the following problems:

1.When i call dispatch_msg, i have borrowed a mutable reference from the caller, and have the thread blocked. That means I cannot ask the same websocket object to send message from another thread

  1. I have noticed that there are implementation on splitting tcpstream to a paired channel let (reader, writer) = tcpstream.split(), however I don't see one on ssl wrapped client. To be specific, I have searched crates websocket, tungstenite (tokio and async) and I cannot find any example code that allows me to use separate channels. even though, assuming i can, i have no idea on how to cope with the borrowing mechanism since i have put them all under the same class.

I would like to ask:

  1. Is there any example code that can solve my situation well as I imagined?
  2. Do I have any design flaw in my above code? How should I reorganize my idea into codes that works in rust?

The .async_connect_secure(None).wait() makes me worried. Avoid mixing async and sync code like that. Additionally afaik .wait() has not existed for a long time?

i understand your point. the strange thing is, in crate websocket-0.26.2, if i use async_connect() i will be given websocket::client::async::Client, and connect() gives websocket::client::sync::Client. There is too much detail for me to process between these two structs, so I stay with the community suggestions from github issues that sync does not give splittable sslstream but async "might" do.

for the .wait(), i guess im still in the learning curve differentiating tokio and async-std. It would be great if you can give me a guiding light.

It is true that you may need async to split it, but keep in mind that using async requires a different coding style using the async/await feature. You can't just replace .await with .wait().

When writing async code, you can't use many parts of the standard library as they are blocking. Tokio and async-std are basically two competing implementations of an async version of the standard library.

I recommend using Tokio out of the two. You can find a tutorial on the Tokio website https://tokio.rs/

As a new user to tokio, a week or so ago, I had what sounds like a similar problem with using the NATS client, https://crates.io/crates/nats/all which does not yet support async.

After much experimentation I found I could call all the nats methods I needed from my async threads without issue except for anything that blocked on receiving messages from NATS. (I make the NATS connection at start up only)

The solution I put in place is to wrap such blocking reads in a tokio "spawn_blocking", which as far as I can tell spins up a real thread and provides an async thread you can then .await on.

    tokio::spawn(async move {
        ....
        let sub = nc.subscribe("some.subject.*").unwrap();

        loop {
            ...
            // async stuff...
            ...
            // Wrap blocking call to NATS to run in own real thread.
            let res = tokio::task::spawn_blocking(move || sub.next()).await.unwrap();
            ....
            // More async stuff...
            ...
        }
    }

I have no idea if that is an optimal/sensible solution but it has been running well so far. Without that wrapping things got very weird.

Of course I should deal with those unwraps.

Your theory about how it works is correct.

The unwrap on spawn_blocking is not necessarily something you need to handle. It refers to panics in the spawned task.

1 Like

Actually I'm not sure if that panic is down to my sync call to sub.next() failing, server died for example, or down to failing to spawn the sync thread that runs it, or both or what?

Ideally if that goes wrong and I really cannot continue I'd rather a clean shutdown and logging of the error.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.