Websocket server cannot process another connection until restarted

I implemented websocket server using soketto crate. Also I use tokio-rs crate and sqlx crate there.
My issue is that I cannot use the server for more than one connection. Seems like something blocked.
Also I noticed that when I run another client instance, it freeze until I shutdown the websocket server and run it again. So I need to run client first and then run server to let server process connection.

This is my server code:

use std::collections::BTreeMap;
use std::env;
use std::io::{Error, ErrorKind};
use std::sync::{Arc, Mutex as SyncMutex};
use std::time::Duration;
use soketto::{connection, handshake, BoxedError};
use tokio::net::{TcpListener, TcpStream, tcp::{OwnedReadHalf, OwnedWriteHalf}};
use tokio::sync::{Mutex, mpsc::{self, Receiver, Sender}};
use tokio::task::JoinHandle;
use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use futures::io::{BufReader, BufWriter};
use futures::future::join_all;
use dotenvy::dotenv;
use json::{JsonValue};
use tokio::time::sleep;

use crate::db::{DB, models::player::Player};

const DB_SYNC_TIMEOUT: u64 = 250;

pub struct Server {
    _db: Arc<Mutex<DB>>,
    players_map: Arc<SyncMutex<BTreeMap<u64, Player>>>,
}

impl Server {
    pub async fn new() -> Self {
        let db = DB::new().await;

        Self {
            _db: Arc::new(Mutex::new(db)),
            players_map: Arc::new(SyncMutex::new(BTreeMap::new())),
        }
    }

    pub async fn run(&mut self) {
        dotenv().ok();

        let server_url = env::var("SERVER_URL").expect("SERVER_URL must be set");
        let listener = TcpListener::bind(server_url).await.unwrap();
        let incoming = TcpListenerStream::new(listener);

        let (player_sender, player_receiver) = mpsc::channel::<Player>(100);

        join_all(vec![
            self.handle_connection(incoming, player_sender),
            self.sync_db(player_receiver).await,
        ]).await;
    }

    async fn sync_db(&mut self, mut player_receiver: Receiver<Player>) -> JoinHandle<()> {
        let db = Arc::clone(&self._db);

        tokio::task::spawn(async move {
            loop {
                if let Some(player) = player_receiver.recv().await {
                    if let Err(err) = db.lock().await.player_query().insert(player).await {
                        // ...
                    }
                }
                sleep(Duration::from_millis(DB_SYNC_TIMEOUT)).await;
            }
        })
    }

    fn handle_connection(
        &mut self,
        mut incoming: TcpListenerStream,
        mut player_sender: Sender<Player>
    ) -> JoinHandle<()> {
        let players_map = Arc::clone(&self.players_map);

        tokio::task::spawn(async move {
            while let Some(socket) = incoming.next().await {
                let mut server = Self::new_server(socket.unwrap());
                let key = {
                    let req = server.receive_request().await.unwrap();
                    req.key()
                };
                let accept = handshake::server::Response::Accept { key, protocol: None };
                server.send_response(&accept).await.unwrap();
                let (mut sender, mut receiver) = server.into_builder().finish();
                let mut message = Vec::new();

                loop {
                    message.clear();
                    match receiver.receive_data(&mut message).await {
                        Ok(soketto::Data::Binary(n)) => {
                            assert_eq!(n, message.len());
                            sender.send_binary_mut(&mut message).await.unwrap();
                            sender.flush().await.unwrap()
                        }
                        Ok(soketto::Data::Text(n)) => {
                            assert_eq!(n, message.len());
                            if let Ok(req) = std::str::from_utf8(&message) {
                                let response = Self::handle_request(
                                    req,
                                    Arc::clone(&players_map),
                                    player_sender.clone(),
                                ).await;

                                sender.send_text(response).await.unwrap();
                                sender.flush().await.unwrap()
                            } else {
                                break;
                            }
                        }
                        Err(connection::Error::Closed) => break,
                        Err(e) => {
                            println!("connection error: {}", e);
                            break;
                        }
                    }
                }
            }
        })
    }

    async fn handle_request(
        request: &str,
        players_map: Arc<SyncMutex<BTreeMap<u64, Player>>>,
        player_sender: Sender<Player>,
    ) -> String {
        let request = json::parse(request);

        let mut message = json::stringify(JsonValue::Null);

        match request {
            Ok(result) => {
                let event = match result["event"].as_str() {
                    Some(event) => Some(event),
                    _ => None,
                };

                let guid = match result["payload"]["guid"].as_u64() {
                    Some(guid) => Some(guid),
                    _ => None,
                };

                let name = match result["payload"]["name"].as_str() {
                    Some(name) => Some(name),
                    _ => None,
                };

                let players = {
                    let guard = players_map.lock().unwrap();
                    guard.clone()
                };

                if let Some(event) = event {
                    match event {
                        "players.get" if guid.is_some() => {
                            if let Some(player) = players.get(&guid.unwrap()) {
                                message = json::stringify(JsonValue::Object(player.into()));
                            }
                        },
                        "players.all" => {
                            message = json::stringify(
                                JsonValue::Object(
                                    players
                                        .into_iter()
                                        .map(|(key, value)| (key.to_string(), value))
                                        .collect()
                                )
                            );
                        },
                        "players.sync" if guid.is_some() && name.is_some() => {
                            let guid = guid.unwrap();
                            if players.get(&guid).is_none() {
                                let name = name.unwrap();

                                let player = Player {
                                    guid,
                                    name: String::from(name),
                                };
                                if let Ok(_) = player_sender.send(player.clone()).await {
                                    { players_map.lock().unwrap().insert(guid, player); }

                                    message = json::stringify(
                                        JsonValue::Boolean(true),
                                    );
                                }
                            }
                        },
                        _ => {},
                    }
                }
            },
            Err(err) => {},
        };

        message
    }

    fn new_server<'a>(socket: TcpStream) -> handshake::Server<'a, BufReader<BufWriter<Compat<TcpStream>>>> {
        handshake::Server::new(BufReader::new(BufWriter::new(socket.compat())))
    }
}

This is websocket client (for client I use ws crate):

use std::cell::RefCell;
use std::rc::Rc;
use std::env;
use dotenvy::dotenv;
use ws::{CloseCode, connect, Message};

pub mod query;

pub struct WSClient {
    _url: String,
}

impl WSClient {
    pub fn new() -> Self {
        dotenv().ok();

        let server_url = env::var("SERVER_URL").expect("SERVER_URL must be set");

        Self {
            _url: format!("ws://{}", server_url),
        }
    }

    pub fn query(&mut self, data: &str) -> String {
        let result = Rc::new(RefCell::new(WSQueryResult::new()));

        connect(self._url.to_string(), |out| {
            let result = Rc::clone(&result);

            if out.send(Message::from(data)).is_err() {
                println!("Websocket couldn't queue an initial message.")
            }

            move |msg: Message| {
                result.as_ref().borrow_mut().data = Some(msg.into_text().unwrap());
                out.close(CloseCode::Normal)
            }
        }).ok();

        return match result.as_ref().borrow_mut().data.take() {
            Some(text) => text,
            _ => String::from(""),
        };
    }
}

#[derive(Clone)]
pub struct WSQueryResult {
    pub data: Option<String>,
}

impl WSQueryResult {
    pub fn new() -> Self {
        Self {
            data: None,
        }
    }
}

and usage on client:

pub fn get_player_name(id: u64) -> Result<String, Error> {
    let query = Query {
        event: String::from("players.get"),
        payload: QueryPayload {
            id,
            name: None,
        }
    };

    let response = WSClient::new().query(&json::stringify(query));
    if response == "null" {
        return Err(
            Error::new(
                ErrorKind::NotFound,
                format!("Player with id {} not found in db", id)
            )
        );
    }

    return match json::parse(&response) {
        Ok(data) => {
            if data.contains("name") {
                let name = String::from(data["name"].as_str().unwrap());
                return Ok(name);
            }
            Err(Error::new(ErrorKind::InvalidData, "JSON response not contains name!"))
        },
        _ => Err(Error::new(ErrorKind::Other, "Cannot parse JSON.")),
    }
}

pub fn sync_player(id: u64, name: String) -> Result<(), Error> {
    if name.is_empty() {
        return Err(Error::new(ErrorKind::InvalidInput, "Name should be not empty string."));
    }

    let query = Query {
        event: String::from("players.sync"),
        payload: QueryPayload {
            id,
            name: Some(name),
        }
    };

    let response = WSClient::new().query(&json::stringify(query));
    if response == "null" {
        return Err(Error::new(ErrorKind::Other, "Player was not sync!"));
    } else if response == "true" {
        return Ok(());
    }

    Err(Error::new(ErrorKind::InvalidData, format!("Wrong response! Reason: {}", &response)))
}

Could somebody explain what is wrong with my server ? Why to let another client connect I need first shutdown the server and run it again ?

well, seems like it's blocked on client side. I just tried to close old connections and start new without shutdown the server. But still not clear what can be blocker.

After I closed old connections and start new, server print some output and crashed with error:

thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Io(Kind(UnexpectedEof))', src\server\mod.rs:80:62

it's on this line:

let key = {
	let req = server.receive_request().await.unwrap();
	req.key()
};

so, probably server also do smth wrong.

Client implementation from soketto works. Thanks to all !

use soketto::handshake::{Client, ServerResponse};

// First, we need to establish a TCP connection.
let socket = tokio::net::TcpStream::connect("...").await?;

// Then we configure the client handshake.
let mut client = Client::new(socket.compat(), "...", "/");

// And finally we perform the handshake and handle the result.
let (mut sender, mut receiver) = match client.handshake().await? {
    ServerResponse::Accepted { .. } => client.into_builder().finish(),
    ServerResponse::Redirect { status_code, location } => unimplemented!("follow location URL"),
    ServerResponse::Rejected { status_code } => unimplemented!("handle failure")
};

// Over the established websocket connection we can send
sender.send_text("some text").await?;
sender.send_text("some more text").await?;
sender.flush().await?;

// ... and receive data.
let mut data = Vec::new();
receiver.receive_data(&mut data).await?;

That error tells you that the server has failed because the client closed the connection unexpectedly - in this case, because the server is sitting in server.receive_request when the client closing the connection (instead of sending whatever data receive_request expects the client to send. This is almost certainly not what you want - which implies that you shouldn't be using unwrap() there, but should be using a better form of error handling.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.