Panic when parsing client messages with tokio-tungstenite

I'm using tokio-tungstenite for a WebSocket communication. I'm using serde_json at the server to deserialize it. This is my client's message in TypeScript, serialized via simply JSON.stringify:

// ... in Socket.ts
    send(message: SocketClientMessage) {
        this._socket?.send(JSON.stringify(message));
    }
// ...

export type SocketClientMessage =
    | { type: "Spawn" }
    | {
        type: "MovingDirections",
        up: boolean,
        down: boolean,
        left: boolean,
        right: boolean,
    };

This is the client's message in Rust:

#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum PeerClientMessage {
    Spawn,
    MovingDirections { up: bool, down: bool, left: bool, right: bool },
}

Deserialized as follows:

// ...
let msg = msg.into_text();
let Ok(msg) = msg.map(|msg| serde_json::from_str::<PeerClientMessage>(msg.as_str())) else {
    unreachable!();
};
let msg = msg.unwrap();
match msg {
// ...

The problem is within the line before the match. I get a panic there.

Am I doing anything wrong?

I look to get a panic in a little occasion, but it looks to work regardless...

Incoming TCP connection from 127.0.0.1:51709
WebSocket connection established: 127.0.0.1:51709
thread 'tokio-runtime-worker' panicked at src\main.rs:87:23:
called `Result::unwrap()` on an `Err` value: Error("EOF while parsing a value", line: 1, column: 0)
thread 'tokio-runtime-worker' panicked at src\main.rs:87:23:
called `Result::unwrap()` on an `Err` value: Error("EOF while parsing a value", line: 1, column: 0)
Incoming TCP connection from 127.0.0.1:51721
WebSocket connection established: 127.0.0.1:51721

You will need to show us what are you doing before let msg = msg.into_text(); to understand what's really happening.

1 Like
Error("EOF while parsing a value", line: 1, column: 0)
//                                 ^^^^^^^^^^^^^^^^^^

That indicates an empty string to me.

1 Like

Here is the handler for messages coming from the client to the server itself:

async fn handle_connection(peers: Peers, raw_stream: TcpStream, address: SocketAddr, level: Arc<Mutex<Level>>) {
    println!("Incoming TCP connection from {address}");
    let websocket_stream = tokio_tungstenite::accept_async(raw_stream)
        .await
        .expect("Error during the websocket handshake occurred");
    println!("WebSocket connection established: {address}");

    // Insert the write part of this peer into the peer map
    let (sender, rx) = unbounded();
    peers.lock().unwrap().insert(address, Peer::new(sender));

    let (outgoing, incoming) = websocket_stream.split();

    let broadcast_incoming = incoming.try_for_each(|msg| {
        // The message is handled here.
        // ...
    });

    // ...
}

I just took it from the examples and changed it a bit

I've another related problem... It seems like I don't receive the world message:

// level/level.rs: Level
// ...
    pub fn next_frame(&mut self, delta: Duration, peers: Peers) {
        // Send the world to every peer
        let world = self.world(Arc::clone(&peers));
        let world_serialized = serde_json::to_string(&world).unwrap();
        for (_, peer) in peers.lock().unwrap().iter() {
            peer.sender.unbounded_send(Message::Text(world_serialized.clone())).unwrap();
        }

        // Step physics simulation forward.
        // ...

Any idea? This is where the frame interval lies:

async fn run_server(address: String) {
    let listener = TcpListener::bind(address.as_str()).await.unwrap();
    println!("Server running at {address}");

    let peers = Arc::new(Mutex::new(HashMap::new()));
    let mut level = Arc::new(Mutex::new(Level::new()));
    let mut interval = interval(FRAME_DURATION);

    tokio::spawn({
        let level = Arc::clone(&mut level);
        let peers = Arc::clone(&peers);
        async move {
            let tick = interval.tick().await;
            level.lock().unwrap().next_frame(tick.elapsed(), peers);
        }
    });

    // Peer connections
    while let Ok((stream, address)) = listener.accept().await {
        tokio::spawn(handle_connection(peers.clone(), stream, address, Arc::clone(&level)));
    }
}

Here's the related client code in TypeScript:

export default class Socket {
    readonly onMessage = new EventEmitter<SocketServerMessage>();
    private _socket: WebSocket | undefined = undefined;

    /**
     * @throws {SocketConnectError}
     */
    connect(): Promise<void> {
        if (this._socket != undefined) {
            this._socket!.close();
            this._socket = undefined;
        }
        return new Promise<void>((resolve, reject) => {
            this._socket = new WebSocket('ws://127.0.0.1:8080');
            this._socket.addEventListener('open', _ => {
                resolve(undefined);
            });
            this._socket.addEventListener('error', _ => {
                reject(new SocketConnectError("Failed to connect to the WebSocket server."));
            });
            this._socket.addEventListener('message', event => {
                if (typeof event.data === "string") {
                    this.onMessage.emit(JSON.parse(event.data));
                }
            });
        });
    }

    send(message: SocketClientMessage) {
        this._socket?.send(JSON.stringify(message));
    }

    disconnect() {
        this._socket?.close();
        this._socket = undefined;
    }
}

Level.ts:

export default class Level {
    private _displaying: boolean = false;
    private _resizeListener: any = undefined;
    private _application: pixi.Application | undefined = undefined;
    private _displayVariables: LevelDisplayVariables | undefined = undefined;
    private _socket: Socket = new Socket();

    constructor() {
        let counter = 0;
        this._socket.onMessage.listen(message => {
            if (message.type == "World") {
                const pixiStage = this._application!.stage;
                pixiStage.removeChildren();
                const { world } = message;
                for (const entity of world.entities) {
                    if (entity.type == "Car") {
                        const orient = (sprite: pixi.Sprite) => {
                            sprite.anchor.set(0.5);
                            sprite.position.set(entity.x, entity.y);
                            sprite.rotation = entity.rotationRadians;
                            if (counter == 0) {
                                console.log(`(${entity.x}, ${entity.y})`);
                            }
                            counter += 1;
                            counter %= 1_000;
                        };
                        const shadow = new pixi.Sprite(pixi.Texture.from("assets/carShadow/car-shadow.png"));
                        shadow.scale.set(1.3, 1.6);
                        const car = new pixi.Sprite(pixi.Texture.from("assets/carSkins/porsche-carrera-gt.png"));
                        pixiStage.addChild(shadow);
                        pixiStage.addChild(car);
                        orient(shadow);
                        orient(car);
                    }
                }
            }
        });
    }
// ...

Logs nothing!

I think you need to be more mindful about how you handle different types of Messages. Filter on text (and binary?) messages, maybe.

Looking here, and assuming it turned into an empty string, the message you got could have been

  • A Close with no reason
  • A Close with an empty reason
  • A Ping, Pong, Binary, or Frame message that converts into the empty string
  • An empty Text message
1 Like

I've done this instead of using .into_text(). It looks to work, but I've two problems currently:

  • The game's world is not being sent to the client (as in my previous post)
  • Once a peer connects and disconnects, any further connection of the same peer will keep going on forever. It doesn't even reach the first println anymore in handle_connection.
let broadcast_incoming = incoming.try_for_each(|msg| {
    // Received a message from `address`
    use tokio_tungstenite::tungstenite::Message;

    let Message::Text(msg) = msg else {
        return future::ok(());
    };
// ...

I'm using just the browser's WebSocket for connecting to the server.

Here's the full function for handling peers:

async fn handle_connection(peers: Peers, raw_stream: TcpStream, address: SocketAddr, level: Arc<Mutex<Level>>) {
    println!("Incoming TCP connection from {address}");
    let websocket_stream = tokio_tungstenite::accept_async(raw_stream)
        .await
        .expect("Error during the websocket handshake occurred");
    println!("WebSocket connection established: {address}");

    // Insert the write part of this peer into the peer map
    let (sender, receiver) = unbounded();
    peers.lock().unwrap().insert(address, Peer::new(sender));

    let (outgoing, incoming) = websocket_stream.split();

    let broadcast_incoming = incoming.try_for_each(|msg| {
        // Received a message from `address`
        use tokio_tungstenite::tungstenite::Message;

        let Message::Text(msg) = msg else {
            return future::ok(());
        };
        let msg = serde_json::from_str::<PeerClientMessage>(msg.as_str()).unwrap();
        match msg {
            PeerClientMessage::Spawn => {
                let rigid_body_handle = level.lock().unwrap().spawn_car();
                let peer = &peers.lock().unwrap()[&address];
                peer.car.rigid_body_handle.set(Some(rigid_body_handle));
            },
            PeerClientMessage::MovingDirections { up, down, left, right } => {
                let mut peers = peers.lock().unwrap();
                let peer = peers.get_mut(&address).unwrap();
                peer.car.moving = up || down || left || right;
                println!("{}", peer.car.moving);

                let next_direction = CarDirection::from_arrows(left, right, up, down);
                if next_direction.is_some() && peer.car.direction != next_direction.unwrap() {
                    let next_direction = next_direction.unwrap();
                    peer.car.direction = next_direction;
                    peer.car.turn.turn_start(next_direction);
                }
            },
        }

        future::ok(())
    });

    let receive_from_others = receiver.map(Ok).forward(outgoing);

    pin_mut!(broadcast_incoming, receive_from_others);
    future::select(broadcast_incoming, receive_from_others).await;

    // disconnect
    let peer = &peers.lock().unwrap()[&address];
    if let Some(rigid_body_handle) = peer.car.rigid_body_handle.get() {
        level.lock().unwrap().remove_rigid_body(rigid_body_handle);
    }

    peers.lock().unwrap().remove(&address);
    println!("{address} disconnected");
}

Here is the function where the server is established:

async fn run_server(address: String) {
    let listener = TcpListener::bind(address.as_str()).await.unwrap();
    println!("Server running at {address}");

    let peers = Arc::new(Mutex::new(HashMap::new()));
    let mut level = Arc::new(Mutex::new(Level::new()));
    let mut interval = interval(FRAME_DURATION);

    tokio::spawn({
        let level = Arc::clone(&mut level);
        let peers = Arc::clone(&peers);
        async move {
            let tick = interval.tick().await;
            level.lock().unwrap().next_frame(tick.elapsed(), peers);
        }
    });

    // Peer connections
    while let Ok((stream, address)) = listener.accept().await {
        tokio::spawn(handle_connection(peers.clone(), stream, address, Arc::clone(&level)));
    }
}