Tls websocket: How to make tungstenite works with mio (for Poll) and secure websocket (wss) via "native-tls" feature of tungstenite crate

I'm using tungstenite (along with native-tls feature for its TlsStream) + mio (for Poll) in order to implement a client program to connect to target secure websocket (wss).

I've come across the very good and very close to what I want as seen here. That code works with non-secure websocket connection. But will fail with the HTTP error code 400 i.e. Bad request if I swap in wss url instead. So that means I'm on the right path, just that the stream needs to supports Tls.

My objective is to use Poll in order to be able to receive incoming messages from websocket, and be able to push some message to it within the main message loop.

With some research, I modified the initial set up code to be as follows

fn main() {
    // Create a poll instance.
    let mut poll = Poll::new().unwrap();
    const STREAM_TOKEN: Token = Token(10);
    const WAKE_TOKEN: Token = Token(20);
    // Create storage for events.
    let mut events = Events::with_capacity(128);
    let std_stream = StdStream::connect("target-hostname.com:443").unwrap();

    // for tls
    let stream = TlsConnector::new().unwrap().connect("wss://target-hostname.com/topic", std_stream).unwrap();

    let hs = ClientHandshake::start(
        stream,
        String::from("wss://target-hostname.com/topic")
            .into_client_request()
            .unwrap(),
        None,
    )
    .unwrap();
    ...
    // Register the socket with `Poll`
    poll.registry()
        .register(ws.get_mut(), STREAM_TOKEN, Interest::READABLE)    // <------- problem
        .unwrap();
    ...

As commented in the code,

native_tls::TlsStream doesn't implement mio::event::Source trait so it is not compatible to be used with Poll::registry::regiter function as seen in the code. It shows the following error

error[E0277]: the trait bound `TlsStream<std::net::TcpStream>: Source` is not satisfied
   --> src/main.rs:53:19
    |
53  |         .register(ws.get_mut(), STREAM_TOKEN, Interest::READABLE)
    |          -------- ^^^^^^^^^^^^ the trait `Source` is not implemented for `TlsStream<std::net::TcpStream>`
    |          |
    |          required by a bound introduced by this call
    |
note: required by a bound in `Registry::register`
   --> /home/something/.cargo/registry/src/github.com-1ecc6299db9ec823/mio-0.8.0/src/poll.rs:499:12
    |
499 |         S: event::Source + ?Sized,
    |            ^^^^^^^^^^^^^ required by this bound in `Registry::register`

I see mio::net::TcpStream on how it implements mio::event::Source here. But i'm not sure how to proceed on this as the underlying IoSource would need to handle things across the board from start.

So, my question is

How can I make tungstenite with native-tls feature works with mio (for its Poll feature)?

You would need to register the underlying tcp stream with mio like when you did it without TLS. How exactly that is done best, I am not sure.

Thank you for suggestion. I also had that in mind but really not sure how to proceed. As checked in relevant crates, it's too complicated for this requirement. Finally I went with a simpler approach, I answered in another comment.

To answer my own question:

PS: I won't mark this as solution just yet although the solution works, but it didn't fully achieve what I aim for in which making mio works with tungstenite.

Simpler way to get things done is to use tungstenite::connect function which returns WebSocket<MaybeTlsStream<TcpStream>> to let underlying code handles Tls. With this way, I have to cut out mio option (thus cannot use its Poll). Then get underlying std::net::TcpStream from the returned socket from that function to either set read timeout, or set it to be non-blocking call. Check its doc for its set_read_timeout and set_nonblocking.

It's not super clean or performant as I really want to use mio's Poll. But at least it works. So adjust the timeout properly, or decide whether or not to use non-blocking. I successfully did it with one spawned thread for sending pushing-message action into the main message loop (in main thread). This setup needs 2 std::sync::mpsc::sync_channel to communicate and blocking-wait to continue next between both threads. So something like the following

So the setup code can be similar to this

    let (mut socket, response) = connect(Url::parse("wss://target-hostname.com/topic").unwrap()).expect("Can't connect");

    match socket.get_mut() {
        MaybeTlsStream::NativeTls(t) => {
            // -- use either one or another
            //t.get_mut().set_nonblocking(true);
            t.get_mut().set_read_timeout(Some(Duration::from_millis(100))).expect("Error: cannot set read-timeout to underlying stream");
        },
        // handle more cases as necessary, this one only focuses on native-tls
        _ => panic!("Error: it is not TlsStream")
    }
    ...

for communication between threads, you would have something like this

    let (sender, receiver) : (SyncSender<MsgType>, _) = sync_channel(0);
    let (rev_sender, rev_receiver) : (SyncSender<MsgType>, _) = sync_channel(0);

    // heartbeat action trigger thread
    std::thread::spawn(move || {
        // Sleep for duration of time,
        // use `sender.send(MsgType::Pingmsg)` to let main thread know it needs to send Ping message 
        // to Websocket.
        // After that, use `rev_receiver.recv()` to wait for confirmation that main thread sends Ping message successfully,
        // so we don't continue the loop and sleep unnecessary which will add more triggering into the queue.
    });

    // main message loop
    // note: in main thread, no need to spawn a new thread
    loop {
        // Use `receiver.try_recv()` (non-blocking) to try to give priority to action triggering from heartbeat thread
        // if there's any, then we will send Ping message to websocket.
        // After send successfully, then `rev_sender.send(MsgType::PongMsg)` to let the heartbeat thread
        // knows that we completes in sending a ping message, so to allow heartbeat thread to continue
        // sleeping and begin next cycle to send a heartbeat to Websocket.
        //
        // After all these, then use `socket.read_message()` to read incoming message from Websocket normally.
    }
1 Like

Thank you so much man! I appreciated...

Hi. Appreciate your kind words.

In case you or others want to migrate away from Polling mechanism to async-based by using tokio-tungstenite, you can take a look at my complete project haxpor/bybit-shiprekt which listens to websocket for liquidation events included with hearbeat, and reconnection ability.

PS: This commit migrated away from polling mechanism which illustrated above as per my finding to async-based via tokio-tunstenite.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.