I implemented websocket server using soketto
crate. Also I use tokio-rs
crate and sqlx
crate there.
My issue is that I cannot use the server for more than one connection. Seems like something blocked.
Also I noticed that when I run another client instance, it freeze until I shutdown the websocket server and run it again. So I need to run client first and then run server to let server process connection.
This is my server code:
use std::collections::BTreeMap;
use std::env;
use std::io::{Error, ErrorKind};
use std::sync::{Arc, Mutex as SyncMutex};
use std::time::Duration;
use soketto::{connection, handshake, BoxedError};
use tokio::net::{TcpListener, TcpStream, tcp::{OwnedReadHalf, OwnedWriteHalf}};
use tokio::sync::{Mutex, mpsc::{self, Receiver, Sender}};
use tokio::task::JoinHandle;
use tokio_stream::{wrappers::TcpListenerStream, StreamExt};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use futures::io::{BufReader, BufWriter};
use futures::future::join_all;
use dotenvy::dotenv;
use json::{JsonValue};
use tokio::time::sleep;
use crate::db::{DB, models::player::Player};
const DB_SYNC_TIMEOUT: u64 = 250;
pub struct Server {
_db: Arc<Mutex<DB>>,
players_map: Arc<SyncMutex<BTreeMap<u64, Player>>>,
}
impl Server {
pub async fn new() -> Self {
let db = DB::new().await;
Self {
_db: Arc::new(Mutex::new(db)),
players_map: Arc::new(SyncMutex::new(BTreeMap::new())),
}
}
pub async fn run(&mut self) {
dotenv().ok();
let server_url = env::var("SERVER_URL").expect("SERVER_URL must be set");
let listener = TcpListener::bind(server_url).await.unwrap();
let incoming = TcpListenerStream::new(listener);
let (player_sender, player_receiver) = mpsc::channel::<Player>(100);
join_all(vec![
self.handle_connection(incoming, player_sender),
self.sync_db(player_receiver).await,
]).await;
}
async fn sync_db(&mut self, mut player_receiver: Receiver<Player>) -> JoinHandle<()> {
let db = Arc::clone(&self._db);
tokio::task::spawn(async move {
loop {
if let Some(player) = player_receiver.recv().await {
if let Err(err) = db.lock().await.player_query().insert(player).await {
// ...
}
}
sleep(Duration::from_millis(DB_SYNC_TIMEOUT)).await;
}
})
}
fn handle_connection(
&mut self,
mut incoming: TcpListenerStream,
mut player_sender: Sender<Player>
) -> JoinHandle<()> {
let players_map = Arc::clone(&self.players_map);
tokio::task::spawn(async move {
while let Some(socket) = incoming.next().await {
let mut server = Self::new_server(socket.unwrap());
let key = {
let req = server.receive_request().await.unwrap();
req.key()
};
let accept = handshake::server::Response::Accept { key, protocol: None };
server.send_response(&accept).await.unwrap();
let (mut sender, mut receiver) = server.into_builder().finish();
let mut message = Vec::new();
loop {
message.clear();
match receiver.receive_data(&mut message).await {
Ok(soketto::Data::Binary(n)) => {
assert_eq!(n, message.len());
sender.send_binary_mut(&mut message).await.unwrap();
sender.flush().await.unwrap()
}
Ok(soketto::Data::Text(n)) => {
assert_eq!(n, message.len());
if let Ok(req) = std::str::from_utf8(&message) {
let response = Self::handle_request(
req,
Arc::clone(&players_map),
player_sender.clone(),
).await;
sender.send_text(response).await.unwrap();
sender.flush().await.unwrap()
} else {
break;
}
}
Err(connection::Error::Closed) => break,
Err(e) => {
println!("connection error: {}", e);
break;
}
}
}
}
})
}
async fn handle_request(
request: &str,
players_map: Arc<SyncMutex<BTreeMap<u64, Player>>>,
player_sender: Sender<Player>,
) -> String {
let request = json::parse(request);
let mut message = json::stringify(JsonValue::Null);
match request {
Ok(result) => {
let event = match result["event"].as_str() {
Some(event) => Some(event),
_ => None,
};
let guid = match result["payload"]["guid"].as_u64() {
Some(guid) => Some(guid),
_ => None,
};
let name = match result["payload"]["name"].as_str() {
Some(name) => Some(name),
_ => None,
};
let players = {
let guard = players_map.lock().unwrap();
guard.clone()
};
if let Some(event) = event {
match event {
"players.get" if guid.is_some() => {
if let Some(player) = players.get(&guid.unwrap()) {
message = json::stringify(JsonValue::Object(player.into()));
}
},
"players.all" => {
message = json::stringify(
JsonValue::Object(
players
.into_iter()
.map(|(key, value)| (key.to_string(), value))
.collect()
)
);
},
"players.sync" if guid.is_some() && name.is_some() => {
let guid = guid.unwrap();
if players.get(&guid).is_none() {
let name = name.unwrap();
let player = Player {
guid,
name: String::from(name),
};
if let Ok(_) = player_sender.send(player.clone()).await {
{ players_map.lock().unwrap().insert(guid, player); }
message = json::stringify(
JsonValue::Boolean(true),
);
}
}
},
_ => {},
}
}
},
Err(err) => {},
};
message
}
fn new_server<'a>(socket: TcpStream) -> handshake::Server<'a, BufReader<BufWriter<Compat<TcpStream>>>> {
handshake::Server::new(BufReader::new(BufWriter::new(socket.compat())))
}
}
This is websocket client (for client I use ws
crate):
use std::cell::RefCell;
use std::rc::Rc;
use std::env;
use dotenvy::dotenv;
use ws::{CloseCode, connect, Message};
pub mod query;
pub struct WSClient {
_url: String,
}
impl WSClient {
pub fn new() -> Self {
dotenv().ok();
let server_url = env::var("SERVER_URL").expect("SERVER_URL must be set");
Self {
_url: format!("ws://{}", server_url),
}
}
pub fn query(&mut self, data: &str) -> String {
let result = Rc::new(RefCell::new(WSQueryResult::new()));
connect(self._url.to_string(), |out| {
let result = Rc::clone(&result);
if out.send(Message::from(data)).is_err() {
println!("Websocket couldn't queue an initial message.")
}
move |msg: Message| {
result.as_ref().borrow_mut().data = Some(msg.into_text().unwrap());
out.close(CloseCode::Normal)
}
}).ok();
return match result.as_ref().borrow_mut().data.take() {
Some(text) => text,
_ => String::from(""),
};
}
}
#[derive(Clone)]
pub struct WSQueryResult {
pub data: Option<String>,
}
impl WSQueryResult {
pub fn new() -> Self {
Self {
data: None,
}
}
}
and usage on client:
pub fn get_player_name(id: u64) -> Result<String, Error> {
let query = Query {
event: String::from("players.get"),
payload: QueryPayload {
id,
name: None,
}
};
let response = WSClient::new().query(&json::stringify(query));
if response == "null" {
return Err(
Error::new(
ErrorKind::NotFound,
format!("Player with id {} not found in db", id)
)
);
}
return match json::parse(&response) {
Ok(data) => {
if data.contains("name") {
let name = String::from(data["name"].as_str().unwrap());
return Ok(name);
}
Err(Error::new(ErrorKind::InvalidData, "JSON response not contains name!"))
},
_ => Err(Error::new(ErrorKind::Other, "Cannot parse JSON.")),
}
}
pub fn sync_player(id: u64, name: String) -> Result<(), Error> {
if name.is_empty() {
return Err(Error::new(ErrorKind::InvalidInput, "Name should be not empty string."));
}
let query = Query {
event: String::from("players.sync"),
payload: QueryPayload {
id,
name: Some(name),
}
};
let response = WSClient::new().query(&json::stringify(query));
if response == "null" {
return Err(Error::new(ErrorKind::Other, "Player was not sync!"));
} else if response == "true" {
return Ok(());
}
Err(Error::new(ErrorKind::InvalidData, format!("Wrong response! Reason: {}", &response)))
}
Could somebody explain what is wrong with my server ? Why to let another client connect I need first shutdown the server and run it again ?