I'm using tokio-tungstenite for a WebSocket communication. I'm using serde_json at the server to deserialize it. This is my client's message in TypeScript, serialized via simply JSON.stringify:
// ...
let msg = msg.into_text();
let Ok(msg) = msg.map(|msg| serde_json::from_str::<PeerClientMessage>(msg.as_str())) else {
unreachable!();
};
let msg = msg.unwrap();
match msg {
// ...
The problem is within the line before the match. I get a panic there.
Am I doing anything wrong?
I look to get a panic in a little occasion, but it looks to work regardless...
Incoming TCP connection from 127.0.0.1:51709
WebSocket connection established: 127.0.0.1:51709
thread 'tokio-runtime-worker' panicked at src\main.rs:87:23:
called `Result::unwrap()` on an `Err` value: Error("EOF while parsing a value", line: 1, column: 0)
thread 'tokio-runtime-worker' panicked at src\main.rs:87:23:
called `Result::unwrap()` on an `Err` value: Error("EOF while parsing a value", line: 1, column: 0)
Incoming TCP connection from 127.0.0.1:51721
WebSocket connection established: 127.0.0.1:51721
Here is the handler for messages coming from the client to the server itself:
async fn handle_connection(peers: Peers, raw_stream: TcpStream, address: SocketAddr, level: Arc<Mutex<Level>>) {
println!("Incoming TCP connection from {address}");
let websocket_stream = tokio_tungstenite::accept_async(raw_stream)
.await
.expect("Error during the websocket handshake occurred");
println!("WebSocket connection established: {address}");
// Insert the write part of this peer into the peer map
let (sender, rx) = unbounded();
peers.lock().unwrap().insert(address, Peer::new(sender));
let (outgoing, incoming) = websocket_stream.split();
let broadcast_incoming = incoming.try_for_each(|msg| {
// The message is handled here.
// ...
});
// ...
}
I just took it from the examples and changed it a bit
I've another related problem... It seems like I don't receive the world message:
// level/level.rs: Level
// ...
pub fn next_frame(&mut self, delta: Duration, peers: Peers) {
// Send the world to every peer
let world = self.world(Arc::clone(&peers));
let world_serialized = serde_json::to_string(&world).unwrap();
for (_, peer) in peers.lock().unwrap().iter() {
peer.sender.unbounded_send(Message::Text(world_serialized.clone())).unwrap();
}
// Step physics simulation forward.
// ...
Any idea? This is where the frame interval lies:
async fn run_server(address: String) {
let listener = TcpListener::bind(address.as_str()).await.unwrap();
println!("Server running at {address}");
let peers = Arc::new(Mutex::new(HashMap::new()));
let mut level = Arc::new(Mutex::new(Level::new()));
let mut interval = interval(FRAME_DURATION);
tokio::spawn({
let level = Arc::clone(&mut level);
let peers = Arc::clone(&peers);
async move {
let tick = interval.tick().await;
level.lock().unwrap().next_frame(tick.elapsed(), peers);
}
});
// Peer connections
while let Ok((stream, address)) = listener.accept().await {
tokio::spawn(handle_connection(peers.clone(), stream, address, Arc::clone(&level)));
}
}
I've done this instead of using .into_text(). It looks to work, but I've two problems currently:
The game's world is not being sent to the client (as in my previous post)
Once a peer connects and disconnects, any further connection of the same peer will keep going on forever. It doesn't even reach the first println anymore in handle_connection.
let broadcast_incoming = incoming.try_for_each(|msg| {
// Received a message from `address`
use tokio_tungstenite::tungstenite::Message;
let Message::Text(msg) = msg else {
return future::ok(());
};
// ...
I'm using just the browser's WebSocket for connecting to the server.
Here's the full function for handling peers:
async fn handle_connection(peers: Peers, raw_stream: TcpStream, address: SocketAddr, level: Arc<Mutex<Level>>) {
println!("Incoming TCP connection from {address}");
let websocket_stream = tokio_tungstenite::accept_async(raw_stream)
.await
.expect("Error during the websocket handshake occurred");
println!("WebSocket connection established: {address}");
// Insert the write part of this peer into the peer map
let (sender, receiver) = unbounded();
peers.lock().unwrap().insert(address, Peer::new(sender));
let (outgoing, incoming) = websocket_stream.split();
let broadcast_incoming = incoming.try_for_each(|msg| {
// Received a message from `address`
use tokio_tungstenite::tungstenite::Message;
let Message::Text(msg) = msg else {
return future::ok(());
};
let msg = serde_json::from_str::<PeerClientMessage>(msg.as_str()).unwrap();
match msg {
PeerClientMessage::Spawn => {
let rigid_body_handle = level.lock().unwrap().spawn_car();
let peer = &peers.lock().unwrap()[&address];
peer.car.rigid_body_handle.set(Some(rigid_body_handle));
},
PeerClientMessage::MovingDirections { up, down, left, right } => {
let mut peers = peers.lock().unwrap();
let peer = peers.get_mut(&address).unwrap();
peer.car.moving = up || down || left || right;
println!("{}", peer.car.moving);
let next_direction = CarDirection::from_arrows(left, right, up, down);
if next_direction.is_some() && peer.car.direction != next_direction.unwrap() {
let next_direction = next_direction.unwrap();
peer.car.direction = next_direction;
peer.car.turn.turn_start(next_direction);
}
},
}
future::ok(())
});
let receive_from_others = receiver.map(Ok).forward(outgoing);
pin_mut!(broadcast_incoming, receive_from_others);
future::select(broadcast_incoming, receive_from_others).await;
// disconnect
let peer = &peers.lock().unwrap()[&address];
if let Some(rigid_body_handle) = peer.car.rigid_body_handle.get() {
level.lock().unwrap().remove_rigid_body(rigid_body_handle);
}
peers.lock().unwrap().remove(&address);
println!("{address} disconnected");
}
Here is the function where the server is established:
async fn run_server(address: String) {
let listener = TcpListener::bind(address.as_str()).await.unwrap();
println!("Server running at {address}");
let peers = Arc::new(Mutex::new(HashMap::new()));
let mut level = Arc::new(Mutex::new(Level::new()));
let mut interval = interval(FRAME_DURATION);
tokio::spawn({
let level = Arc::clone(&mut level);
let peers = Arc::clone(&peers);
async move {
let tick = interval.tick().await;
level.lock().unwrap().next_frame(tick.elapsed(), peers);
}
});
// Peer connections
while let Ok((stream, address)) = listener.accept().await {
tokio::spawn(handle_connection(peers.clone(), stream, address, Arc::clone(&level)));
}
}