Im trying to create Websocket server. I have tried 4 libraries, here all of them :
Tokio Websocket
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use tokio_websockets::{Error, Message, ServerBuilder};
#[derive(Serialize, Deserialize, Debug)]
struct Msg {
num: i32,
}
#[tokio::main]
async fn main() -> Result<(), Error> {
let listener = TcpListener::bind("127.0.0.1:9001").await.unwrap();
println!("WebSocket server started on :9001");
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(async move {
match ServerBuilder::new().accept(stream).await {
Ok((_request, ws_stream)) => {
echo(ws_stream).await;
}
Err(e) => {
eprintln!("Failed to accept WebSocket connection: {}", e);
}
}
});
}
Ok(())
}
async fn echo(mut ws_stream: tokio_websockets::WebSocketStream<tokio::net::TcpStream>) {
while let Some(Ok(msg)) = ws_stream.next().await {
if msg.is_text() || msg.is_binary() {
if let Some(text) = msg.as_text() {
match serde_json::from_str::<Msg>(text) {
Ok(json_msg) => {
if let Ok(response_text) = serde_json::to_string(&json_msg) {
if let Err(e) = ws_stream.send(Message::text(response_text)).await {
eprintln!("Error sending json: {}", e);
break;
}
}
}
Err(e) => {
eprintln!("Error reading json: {}", e);
}
}
}
}
}
}
Tokio Tungstenite
use std::{env, io::Error};
use futures_util::{StreamExt, SinkExt};
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::accept_async;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Error> {
let addr = env::args().nth(1).unwrap_or_else(|| "0.0.0.0:9001".to_string());
let listener = TcpListener::bind(&addr).await.expect("Failed to bind");
println!("WebSocket server running on ws://{}", addr);
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(handle_connection(stream));
}
Ok(())
}
async fn handle_connection(stream: TcpStream) {
let ws_stream = accept_async(stream).await.expect("WebSocket handshake failed");
let (mut write, mut read) = ws_stream.split();
while let Some(Ok(msg)) = read.next().await {
if msg.is_text() || msg.is_binary() {
write.send(msg).await.unwrap();
}
}
}
Raw Tungstenite
#[macro_use]
extern crate may;
use may::net::TcpListener;
use tungstenite::{
accept_hdr, handshake::server::{Request, Response}, Message
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
#[derive(Debug, Serialize, Deserialize)]
struct Msg {
num: i32,
}
fn main() {
let server = TcpListener::bind("127.0.0.1:9001").expect("Gagal bind ke port 9001");
println!("WebSocket Server running on ws://127.0.0.1:9001");
let clients = Arc::new(Mutex::new(Vec::new()));
for stream in server.incoming() {
let clients = Arc::clone(&clients);
go!(move || {
let stream = match stream {
Ok(s) => s,
Err(_) => return,
};
let callback = |_: &Request, mut response: Response| {
let headers = response.headers_mut();
headers.append("MyCustomHeader", ":)".parse().unwrap());
Ok(response)
};
let mut websocket = match accept_hdr(stream, callback) {
Ok(ws) => ws,
Err(_) => return,
};
{
let mut clients_lock = clients.lock().unwrap();
clients_lock.push(websocket.get_ref().try_clone().unwrap());
}
loop {
match websocket.read() {
Ok(msg) => {
if msg.is_text() {
let received_text = msg.to_text().unwrap();
match serde_json::from_str::<Msg>(received_text) {
Ok(parsed_msg) => {
let response = serde_json::to_string(&parsed_msg).unwrap();
if websocket.send(Message::Text(response.into())).is_err() {
break;
}
}
Err(_) => break,
}
}
}
Err(_) => {
let _ = websocket.close(None);
break;
}
}
}
});
}
}
Ntex
use ntex::service::fn_factory_with_config;
use std::io;
use ntex::web;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct Msg {
num: i32,
}
async fn ws_service(
_sink: web::ws::WsSink,
) -> Result<impl ntex::service::Service<
web::ws::Frame,
Response = Option<web::ws::Message>,
Error = io::Error,
>, web::Error> {
let service = ntex::fn_service(move |frame: web::ws::Frame| async move {
match frame {
web::ws::Frame::Text(text) => {
let text_str = std::str::from_utf8(text.as_ref()).unwrap_or("");
match serde_json::from_str::<Msg>(text_str) {
Ok(msg) => {
let json_str = serde_json::to_string(&msg).unwrap();
Ok(Some(web::ws::Message::Text(json_str.into())))
}
Err(_) => Ok(None),
}
},
web::ws::Frame::Binary(bin) => Ok(Some(web::ws::Message::Binary(bin))),
web::ws::Frame::Close(reason) => Ok(Some(web::ws::Message::Close(reason))),
_ => Ok(None),
}
});
Ok(service)
}
async fn ws_index(req: web::HttpRequest) -> Result<web::HttpResponse, web::Error> {
web::ws::start(req, fn_factory_with_config(ws_service)).await
}
#[ntex::main]
async fn main() -> std::io::Result<()> {
println!("WebSocket server started on :9001");
web::server(|| {
web::App::new()
.service(
web::resource("/")
.route(web::get().to(ws_index))
)
})
.bind("127.0.0.1:9001")?
.run()
.await
}
Where the Go one use Gorilla Websocket, the code is like this
package main
import (
"fmt"
"github.com/gorilla/websocket"
"net/http"
)
type msg struct {
Num int `json:"num"`
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func main() {
http.HandleFunc("/", wsHandler)
fmt.Println("WebSocket server started on :9001")
panic(http.ListenAndServe(":9001", nil))
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, "Could not open websocket connection", http.StatusBadRequest)
return
}
defer conn.Close()
go echo(conn)
}
func echo(conn *websocket.Conn) {
for {
var m msg
err := conn.ReadJSON(&m)
if err != nil {
fmt.Println("Error reading json:", err)
break
}
// fmt.Printf("Received message: %#v\n", m)
if err = conn.WriteJSON(m); err != nil {
fmt.Println("Error sending json:", err)
break
}
}
}
I benchmarked using K6 with the following simple config
import ws from 'k6/ws';
import { check } from 'k6';
export let options = {
vus: 100,
duration: '2s',
};
export default function () {
let url = 'ws://127.0.0.1:9001';
let res = ws.connect(url, function (socket) {
socket.on('open', function () {
socket.send(JSON.stringify({ num: 42 }));
});
socket.setTimeout(() => {
socket.close();
}, 5000);
});
check(res, { 'Connected successfully': (r) => r && r.status === 101 });
}
The Rust results are always not performant like this I dont know why
execution: local
script: ws.js
output: -
scenarios: (100.00%) 1 scenario, 100 max VUs, 32s max duration (incl. graceful stop):
* default: 100 looping VUs for 2s (gracefulStop: 30s)
✓ Connected successfully
checks................: 100.00% 100 out of 100
data_received.........: 14 kB 2.8 kB/s
data_sent.............: 22 kB 4.3 kB/s
iteration_duration....: avg=5s min=5s med=5s max=5.01s p(90)=5.01s p(95)=5.01s
iterations............: 100 19.929403/s
vus...................: 100 min=100 max=100
vus_max...............: 100 min=100 max=100
ws_connecting.........: avg=7.75ms min=305.79µs med=8.57ms max=15.46ms p(90)=12.24ms p(95)=15.04ms
ws_msgs_received......: 100 19.929403/s
ws_msgs_sent..........: 100 19.929403/s
ws_session_duration...: avg=5s min=5s med=5s max=5.01s p(90)=5.01s p(95)=5.01s
ws_sessions...........: 100 19.929403/s
Where the Go result is nice performant
execution: local
script: ws.js
output: -
scenarios: (100.00%) 1 scenario, 100 max VUs, 32s max duration (incl. graceful stop):
* default: 100 looping VUs for 2s (gracefulStop: 30s)
✓ Connected successfully
checks................: 100.00% 19995 out of 19995
data_received.........: 2.6 MB 1.3 MB/s
data_sent.............: 4.2 MB 2.1 MB/s
iteration_duration....: avg=9.95ms min=300.99µs med=8.54ms max=56.6ms p(90)=18.55ms p(95)=22.68ms
iterations............: 19995 9964.616943/s
vus...................: 100 min=100 max=100
vus_max...............: 100 min=100 max=100
ws_connecting.........: avg=8.99ms min=142.55µs med=7.79ms max=46.99ms p(90)=16.77ms p(95)=20.59ms
ws_msgs_received......: 11 5.48191/s
ws_msgs_sent..........: 19995 9964.616943/s
ws_session_duration...: avg=9.82ms min=257.29µs med=8.43ms max=56.55ms p(90)=18.36ms p(95)=22.51ms
ws_sessions...........: 19995 9964.616943/s
Seeking for any advise here please