Websocket code performs much worse than Go Websocket

Im trying to create Websocket server. I have tried 4 libraries, here all of them :

Tokio Websocket

use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use tokio_websockets::{Error, Message, ServerBuilder};

#[derive(Serialize, Deserialize, Debug)]
struct Msg {
    num: i32,
}

#[tokio::main]
async fn main() -> Result<(), Error> {

    let listener = TcpListener::bind("127.0.0.1:9001").await.unwrap();
    println!("WebSocket server started on :9001");

    while let Ok((stream, _)) = listener.accept().await {
        tokio::spawn(async move {

            match ServerBuilder::new().accept(stream).await {
                Ok((_request, ws_stream)) => {
                    echo(ws_stream).await;
                }
                Err(e) => {
                    eprintln!("Failed to accept WebSocket connection: {}", e);
                }
            }
        });
    }

    Ok(())
}

async fn echo(mut ws_stream: tokio_websockets::WebSocketStream<tokio::net::TcpStream>) {

    while let Some(Ok(msg)) = ws_stream.next().await {
        if msg.is_text() || msg.is_binary() {
            if let Some(text) = msg.as_text() {
                match serde_json::from_str::<Msg>(text) {
                    Ok(json_msg) => {

                        if let Ok(response_text) = serde_json::to_string(&json_msg) {
                            if let Err(e) = ws_stream.send(Message::text(response_text)).await {
                                eprintln!("Error sending json: {}", e);
                                break;
                            }
                        }
                    }
                    Err(e) => {
                        eprintln!("Error reading json: {}", e);
                    }
                }
            }
        }
    }
}

Tokio Tungstenite

use std::{env, io::Error};
use futures_util::{StreamExt, SinkExt};
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::accept_async;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Error> {
    let addr = env::args().nth(1).unwrap_or_else(|| "0.0.0.0:9001".to_string());

    let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
    println!("WebSocket server running on ws://{}", addr);

    while let Ok((stream, _)) = listener.accept().await {
        tokio::spawn(handle_connection(stream));
    }

    Ok(())
}

async fn handle_connection(stream: TcpStream) {
    let ws_stream = accept_async(stream).await.expect("WebSocket handshake failed");
    let (mut write, mut read) = ws_stream.split();

    while let Some(Ok(msg)) = read.next().await {
        if msg.is_text() || msg.is_binary() {
            write.send(msg).await.unwrap();
        }
    }
}

Raw Tungstenite

#[macro_use]
extern crate may;

use may::net::TcpListener;
use tungstenite::{
    accept_hdr, handshake::server::{Request, Response}, Message
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};

#[derive(Debug, Serialize, Deserialize)]
struct Msg {
    num: i32,
}

fn main() {
    let server = TcpListener::bind("127.0.0.1:9001").expect("Gagal bind ke port 9001");
    println!("WebSocket Server running on ws://127.0.0.1:9001");

    let clients = Arc::new(Mutex::new(Vec::new()));

    for stream in server.incoming() {
        let clients = Arc::clone(&clients);
        go!(move || {
            let stream = match stream {
                Ok(s) => s,
                Err(_) => return,
            };

            let callback = |_: &Request, mut response: Response| {
                let headers = response.headers_mut();
                headers.append("MyCustomHeader", ":)".parse().unwrap());
                Ok(response)
            };

            let mut websocket = match accept_hdr(stream, callback) {
                Ok(ws) => ws,
                Err(_) => return,
            };

            {
                let mut clients_lock = clients.lock().unwrap();
                clients_lock.push(websocket.get_ref().try_clone().unwrap());
            }

            loop {
                match websocket.read() {
                    Ok(msg) => {
                        if msg.is_text() {
                            let received_text = msg.to_text().unwrap();
                            match serde_json::from_str::<Msg>(received_text) {
                                Ok(parsed_msg) => {
                                    let response = serde_json::to_string(&parsed_msg).unwrap();
                                    if websocket.send(Message::Text(response.into())).is_err() {
                                        break;
                                    }
                                }
                                Err(_) => break,
                            }
                        }
                    }
                    Err(_) => {
                        let _ = websocket.close(None);
                        break;
                    }
                }
            }
        });
    }
}

Ntex

use ntex::service::fn_factory_with_config;
use std::io;
use ntex::web;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Msg {
    num: i32,
}

async fn ws_service(
    _sink: web::ws::WsSink,
) -> Result<impl ntex::service::Service<
    web::ws::Frame,
    Response = Option<web::ws::Message>,
    Error = io::Error,
>, web::Error> {
    let service = ntex::fn_service(move |frame: web::ws::Frame| async move {
        match frame {
            web::ws::Frame::Text(text) => {
                let text_str = std::str::from_utf8(text.as_ref()).unwrap_or("");
                match serde_json::from_str::<Msg>(text_str) {
                    Ok(msg) => {
                        let json_str = serde_json::to_string(&msg).unwrap();
                        Ok(Some(web::ws::Message::Text(json_str.into())))
                    }
                    Err(_) => Ok(None),
                }
            },
            web::ws::Frame::Binary(bin) => Ok(Some(web::ws::Message::Binary(bin))),
            web::ws::Frame::Close(reason) => Ok(Some(web::ws::Message::Close(reason))),
            _ => Ok(None),
        }
    });
    Ok(service)
}

async fn ws_index(req: web::HttpRequest) -> Result<web::HttpResponse, web::Error> {
    web::ws::start(req, fn_factory_with_config(ws_service)).await
}

#[ntex::main]
async fn main() -> std::io::Result<()> {
    println!("WebSocket server started on :9001");
    web::server(|| {
        web::App::new()
            .service(
                web::resource("/")
                    .route(web::get().to(ws_index))
            )
    })
    .bind("127.0.0.1:9001")?
    .run()
    .await
}

Where the Go one use Gorilla Websocket, the code is like this

package main

import (
        "fmt"
        "github.com/gorilla/websocket"
        "net/http"
)

type msg struct {
    Num int `json:"num"`
}

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

func main() {
    http.HandleFunc("/", wsHandler)

        fmt.Println("WebSocket server started on :9001")
    panic(http.ListenAndServe(":9001", nil))
}

func wsHandler(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        http.Error(w, "Could not open websocket connection", http.StatusBadRequest)
        return
    }
    defer conn.Close()

        go echo(conn)
}

func echo(conn *websocket.Conn) {
    for {
        var m msg
            err := conn.ReadJSON(&m)
        if err != nil {
            fmt.Println("Error reading json:", err)
            break
        }

       // fmt.Printf("Received message: %#v\n", m)

        if err = conn.WriteJSON(m); err != nil {
            fmt.Println("Error sending json:", err)
            break
        }
    }
}

I benchmarked using K6 with the following simple config


import ws from 'k6/ws';
import { check } from 'k6';

export let options = {
  vus: 100,
  duration: '2s',
};

export default function () {
  let url = 'ws://127.0.0.1:9001';
  let res = ws.connect(url, function (socket) {
    socket.on('open', function () {

      socket.send(JSON.stringify({ num: 42 }));
    });

    socket.setTimeout(() => {
      socket.close();
    }, 5000);
  });

  check(res, { 'Connected successfully': (r) => r && r.status === 101 });
}

The Rust results are always not performant like this I dont know why

     execution: local
        script: ws.js
        output: -

     scenarios: (100.00%) 1 scenario, 100 max VUs, 32s max duration (incl. graceful stop):
              * default: 100 looping VUs for 2s (gracefulStop: 30s)


     ✓ Connected successfully

     checks................: 100.00% 100 out of 100
     data_received.........: 14 kB   2.8 kB/s
     data_sent.............: 22 kB   4.3 kB/s
     iteration_duration....: avg=5s     min=5s       med=5s     max=5.01s   p(90)=5.01s   p(95)=5.01s
     iterations............: 100     19.929403/s
     vus...................: 100     min=100        max=100
     vus_max...............: 100     min=100        max=100
     ws_connecting.........: avg=7.75ms min=305.79µs med=8.57ms max=15.46ms p(90)=12.24ms p(95)=15.04ms
     ws_msgs_received......: 100     19.929403/s
     ws_msgs_sent..........: 100     19.929403/s
     ws_session_duration...: avg=5s     min=5s       med=5s     max=5.01s   p(90)=5.01s   p(95)=5.01s
     ws_sessions...........: 100     19.929403/s

Where the Go result is nice performant

    execution: local
        script: ws.js
        output: -

     scenarios: (100.00%) 1 scenario, 100 max VUs, 32s max duration (incl. graceful stop):
              * default: 100 looping VUs for 2s (gracefulStop: 30s)


     ✓ Connected successfully

     checks................: 100.00% 19995 out of 19995
     data_received.........: 2.6 MB  1.3 MB/s
     data_sent.............: 4.2 MB  2.1 MB/s
     iteration_duration....: avg=9.95ms min=300.99µs med=8.54ms max=56.6ms  p(90)=18.55ms p(95)=22.68ms
     iterations............: 19995   9964.616943/s
     vus...................: 100     min=100            max=100
     vus_max...............: 100     min=100            max=100
     ws_connecting.........: avg=8.99ms min=142.55µs med=7.79ms max=46.99ms p(90)=16.77ms p(95)=20.59ms
     ws_msgs_received......: 11      5.48191/s
     ws_msgs_sent..........: 19995   9964.616943/s
     ws_session_duration...: avg=9.82ms min=257.29µs med=8.43ms max=56.55ms p(90)=18.36ms p(95)=22.51ms
     ws_sessions...........: 19995   9964.616943/s

Seeking for any advise here please

I think the websocket doesn't even send messages properly. Your average time is exactly 5 seconds long. Which is also the timeout you set.

Also how is the amount of messages send in your go version different than the messages received. Is your Go implementation only handling 11 of 19995 connections?

Edit i think i have it now:
Your websocket only closes after 5 seconds. Which is why your rust implementation gives you exactly 100 connections in 2 seconds, the number of vus.

The go implementation drops tons of connections, it only answers 11 of them. The rest of your send data goes nowhere and it can constantly try to create new connections driving up your data send and received.

1 Like

So does it mean that Rust implementations are normal? Then closing the connection is the client's right, right? Does that mean we on the server don't have to close it or what? So does that mean the Go implementation is the problem? I didn't realize there was a message received :<, my native lang is not English the report results are not neat like wrk repprt for http :< do you know any reliable tools that can be used to benchmark websockets?

In your k6 config you should:

  • listen for the returned message
  • then close the connection manually in the handler
  • also fail the check() function if the message is not returned
    (i do not know the api for these)

This should allow your tester to open more connections. Also change the time from 2s to something larger or you won't see Go's garbage collection time

When you close the connection in your k6 the server will also be notified and close your stream (though i do not now what exactly these implementations do on a socket close)

Fron the benchmark your rust code looks fine: it handles all 100 connections correctly.

1 Like

Yuhuuu I edited the config like this

import ws from 'k6/ws';
import { check } from 'k6';

export let options = {
  vus: 100,
  duration: '30s', 
};

let total = 0;

export default function () {
  let url = 'ws://127.0.0.1:9001';
  let messageReceived = false;

  let res = ws.connect(url, function (socket) {
    socket.on('open', function () {
      socket.send(JSON.stringify({ num: 42 }));
    });

    socket.on('message', function (message) {
      console.log(`Received: ${message}`);
      total += 1;
      console.log(`Current total: ${total}`);
      messageReceived = true;

      socket.close();
    });

    socket.setTimeout(function() {
      if (!messageReceived) {
        console.log('No message received within timeout period');
      }
      if (!messageReceived) {
        socket.close();
      }
    }, 5000);
  });

  check(res, {
    'Connected successfully': (r) => r && r.status === 101,
    'Message received': () => messageReceived
  });
}

The Rust result is pretty amazing 49.000 messages :wink:

 checks................: 100.00% 98046 out of 98046
     data_received.........: 6.9 MB  230 kB/s
     data_sent.............: 11 MB   354 kB/s
     iteration_duration....: avg=61.2ms   min=392.68µs med=68.01ms  max=158.02ms p(90)=90.18ms  p(95)=96.22ms
     iterations............: 49023   1632.068219/s
     vus...................: 100     min=100            max=100
     vus_max...............: 100     min=100            max=100
     ws_connecting.........: avg=601.99µs min=93.48µs  med=406.52µs max=26.67ms  p(90)=921.36µs p(95)=1.26ms
     ws_msgs_received......: 49023   1632.068219/s
     ws_msgs_sent..........: 49023   1632.068219/s
     ws_session_duration...: avg=61.13ms  min=350.24µs med=67.94ms  max=157.96ms p(90)=90.1ms   p(95)=96.15ms
     ws_sessions...........: 49023   1632.068219/s

The Go one still failed but it's okay because my original goal is to compare different Rust Websocket implementations to choose the fastest one of them :wink: So I can comeback to my original goal ^^ Thank youuu

3 Likes