[tokio_tungstenite] Async Game Server Design

As a hobby project I'm trying to design an online multiplayer game server. Lots of interesting engineering challenges, but there are a few concepts that I can't figure out.

Intro/Background

The Game

Named Dai Di (Big Two). It is a card shedding game for 4 players. A deck of cards is split equally between players and they take it in turns making plays (or passing). First player to get rid of all of their cards wins.

The Goals

  • Browser based client
  • Websockets for communication (considered WebRTC but seems overly complex for this use case)
  • WASM to allow code reuse for things like checking plays without a server round trip (purely for UX; plays validated on the server)
  • Fit as many games as possible into a single server

Initial Design

I'm using tokio_tungstenite to handle async websocket connections in a multithreaded manner. Players send messages on the websocket connection to join a game and make plays. There is a global HashMap of games wrapped in Arc<Mutex> to allow them to be accessed across threads.

Questions

  1. Overall Architecture:
    Every time a socket, player, or game is updated it will cause the entire data structure to be locked. This might be fine for a small number of players/games, but I have to imagine there will be a point where the server spends most of its time waiting for locks. One concept I saw mentioned was to avoid locking by message passing, but I couldn't figure out how to apply that to the tokio way of doing things. The theory being, each game would have its own thread and then broadcast game state via channels to the relevant sockets. Sockets could pass messages to the game thread via separate channels. The part that eludes me is how to create those game threads.
  2. Non-blocking Timers:
    There are situations where the server would need to perform an action after a certain period of time e.g. a player taking too long to make a play. I can't figure out how to handle this in a way that doesn't block the websocket thread - if you sleep on the player socket then they won't be able to send a message until the thread wakes up again.
  3. AI Players:
    This is currently handled in the game logic. If the next player is a bot then the game will automatically take their turn after the previous play has been made. This works but it has its downsides. I'd rather this was not part of the game logic as it makes it more difficult to reason about, and it means that processing a turn can take a variable amount of time/resources, which in turn means a delayed response to the client. If there are multiple bots in a row then they could make their plays very quickly, which has implications for the UI (overwriting the last play). What I think I might end up doing is having a separate server for bots which acts as an alternate client. The game server can then treat them like any other player.
  4. Socket Disconnects/Reconnects:
    There is a naive handling of this in the code below. It is limited somewhat by my lack of understanding about how to handle timers in this async context. I'm wondering if there is a better way of doing this kind of reconnect logic?

For further context, here is some code I vomited out to experiment with:


use std::{
    collections::HashMap,
    env,
    io::Error as IoError,
    net::SocketAddr,
    sync::{Arc, Mutex},
};

use futures::channel::mpsc::{unbounded, UnboundedSender};
use futures::{future, pin_mut, stream::TryStreamExt, StreamExt};

use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::tungstenite::Message;

type Tx = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;

use bigtwolib::messages::{decode_message_from_string, MessageType};
use bigtwolib::game::{Game, Player};

type GameMap = Arc<Mutex<HashMap<String, Game>>>;
type PlayerGameMap = Arc<Mutex<HashMap<String, String>>>;
type PlayerSocketMap = Arc<Mutex<HashMap<String, Tx>>>;
type SocketPlayerMap = Arc<Mutex<HashMap<SocketAddr, String>>>;


async fn handle_connection<'a>(peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr, games: GameMap, player_game: PlayerGameMap, player_sockets: PlayerSocketMap, socket_player: SocketPlayerMap) {
    println!("Incoming TCP connection from: {}", addr);

    let ws_stream = tokio_tungstenite::accept_async(raw_stream)
        .await
        .expect("Error during the websocket handshake occurred");
    println!("WebSocket connection established: {}", addr);

    // Insert the write part of this peer to the peer map.
    let (tx, rx) = unbounded();
    peer_map.lock().unwrap().insert(addr, tx.clone());

    let (outgoing, incoming) = ws_stream.split();

    let broadcast_incoming = incoming.try_for_each(|msg| {
        return match msg {
            Message::Text(_) => {
                let msg_text = msg.to_text().unwrap();
                println!("Received a message from {}: {}", addr, &msg_text);
                let decoded_message_result = decode_message_from_string(msg_text);

                if decoded_message_result.is_err() {
                    println!("Failed to decode message from socket, or the message type is not supported");
                    return future::ok(())
                }

                let decoded_message = decoded_message_result.unwrap();

                let response_msg: Message = match decoded_message {
                    MessageType::Debug{foo} => {
                        Message::from(format!("Got debug message with `foo` value `{}`", foo))
                    },
                    MessageType::JoinGame{game_uuid, player_uuid, display_name} => {
                        println!("Got join game message from player '{}' ({}) for game uuid `{}`", player_uuid, display_name, game_uuid);

                        if let Some(target_game) = games.lock().unwrap().get_mut(&*game_uuid){
                            let new_player = Player::new(player_uuid.clone(), display_name.clone());

                            // TODO: handle player reconnect; different socket but same player uid
                            if let Ok(success) = target_game.add_player(new_player) {
                                player_sockets.lock().unwrap().insert(player_uuid.clone(), tx.clone());
                                socket_player.lock().unwrap().insert(addr.clone(), player_uuid.clone());
                                player_game.lock().unwrap().insert(player_uuid.clone(), game_uuid.clone());

                                println!("Game players: {}", target_game.print_players());
                                // TODO: maybe borrow the values? But then there are lifetime issues

                                if let Ok(response) = (MessageType::PlayerJoined {game_uuid: game_uuid.clone(), player_uuid: player_uuid.clone(), display_name: display_name.clone()}).to_json() {
                                    for game_player in target_game.get_player_uuids() {
                                        player_sockets.lock().unwrap().get(&game_player).unwrap().unbounded_send(Message::from(response.clone())).unwrap();
                                    }
                                } else {
                                    println!("Failed to serialise PlayerJoined message");
                                }

                                if let Ok(response) = (MessageType::WelcomeToGame {game_uuid: game_uuid.clone(), player_uuid: player_uuid.clone(), display_name: display_name.clone()}).to_json() {
                                    Message::from(response)
                                } else {
                                    println!("Failed to serialise WelcomeToGame response");
                                    Message::from("Server error; check console")
                                }
                            } else {
                                if let Ok(response) = (MessageType::GameFull).to_json() {
                                    Message::from(response)
                                } else {
                                    println!("Failed to serialise GameFull response");
                                    Message::from("Server error; check console")
                                }
                            }
                        } else {
                            if let Ok(response) = (MessageType::InvalidGameUUID).to_json() {
                                Message::from(response)
                            } else {
                                println!("Failed to serialise InvalidGameUUID response");
                                Message::from("Server error; check console")
                            }
                        }
                    }
                    _ => {
                        Message::from("What are you doing?!")
                    }
                };

                let peers = peer_map.lock().unwrap();

                // TODO: global hashmap of player uuid to socket?
                // TODO: use the player uuid in the game to get the socket and send a message
                // TODO: player uuid to game uuid map

                // TODO: mutex on the global socket hashmap? Will this break at scale with lots
                //   of threads trying to get the lock?

                // TODO: game per thread? store player sockets per game?

                peers.get(&addr).unwrap().unbounded_send(response_msg.clone()).unwrap();

                future::ok(())
            }
            _ => {
                future::ok(())
            }
        }
    });

    let receive_from_others = rx.map(Ok).forward(outgoing);

    pin_mut!(broadcast_incoming, receive_from_others);
    future::select(broadcast_incoming, receive_from_others).await;

    println!("{} disconnected", &addr);
    peer_map.lock().unwrap().remove(&addr);

    // TODO: delay player cleanup to allow them to reconnect

    // This could error out; more graceful handling
    let mut socket_player_guard = socket_player.lock().unwrap();

    if let Some(player_uuid) = socket_player_guard.get(&addr) {
        let mut player_game_guard = player_game.lock().unwrap();

        if let Some(game_uuid) = player_game_guard.get(&*player_uuid.clone()) {
            if let Some(game) = games.lock().unwrap().get_mut(&*game_uuid.clone()) {
                game.remove_player(player_uuid.clone());
                println!("Removed the player from the game instance");
            } else {
                println!("Failed to lookup the game for game uuid {}", game_uuid)
            }

            player_game_guard.remove(&*player_uuid.clone());
            println!("Removed the player game map value");

        } else {
            println!("Failed to lookup the game uuid for player uuid {}", player_uuid)
        }

        // Remove the player socket from the map
        player_sockets.lock().unwrap().remove(&*player_uuid.clone());
        socket_player_guard.remove(&addr);
        println!("Removed the player socket map values");
    } else {
        println!("Failed to lookup the player uuid for socket {}", addr)
    }

}

#[tokio::main]
async fn main() -> Result<(), IoError> {
    let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());

    let state = PeerMap::new(Mutex::new(HashMap::new()));
    let games = GameMap::new(Mutex::new(HashMap::new()));
    let player_game = PlayerGameMap::new(Mutex::new(HashMap::new()));
    let player_sockets = PlayerSocketMap::new(Mutex::new(HashMap::new()));
    let socket_player = SocketPlayerMap::new(Mutex::new(HashMap::new()));


    let debug_game = Game::new();
    println!("Debug game UUID: {}", debug_game.id);
    // games.lock().unwrap().insert(debug_game.id.clone(), debug_game);
    games.lock().unwrap().insert("debuggameuuid".to_string(), debug_game);

    // Create the event loop and TCP listener we'll accept connections on.
    let try_socket = TcpListener::bind(&addr).await;
    let listener = try_socket.expect("Failed to bind");
    println!("Listening on: {}", addr);

    // Let's spawn the handling of each connection in a separate task.
    while let Ok((stream, addr)) = listener.accept().await {
        tokio::spawn(handle_connection(state.clone(), stream, addr, games.clone(), player_game.clone(), player_sockets.clone(), socket_player.clone()));
    }

    Ok(())
}

Any suggestions would be greatly appreciated!

1 Like

Let me outline the architecture I used for my telnet-chat project:

Like your project, it creates a task for each connection, and that task uses channels to communicate with the outside world. However, in telnet-chat, I also spawn a task designed to route messages between the clients. The way this setup works is that whenever a client sends a message, the connection task first sends that message to the routing task. The routing task then looks up which other clients that message needs to go to using a local HashMap owned exclusively by the routing task (i.e. no lock around it), and the routing task then sends those messages to those client tasks.

I would suggest that you use a similar architecture here, where instead of having the client tasks directly send the messages, you set up a routing task in the middle, and give that routing task exclusive ownership of the information needed to send it to the right places. This is useful because the routing task can avoid having to deal with network IO, and the connection tasks can focus exclusively on dealing with network IO.

For your project, you might want to set up one routing task for the lobby, then spawn a game task for each game, having the game task handle routing for clients in that game. This way independent games don't use the same resources at all.

I give an outline of the above approach in my blog post Actors with Tokio.

Let me give some other comments in no particular order:

  • It sounds like you could implement timers by using tokio::select! to abort the ws_socket.next() call if it takes too long.
  • I generally recommend using the variant in Tokio over the one in futures in the cases where it exists in both places. Mainly this refers to select, pin_mut and especially channels. (though if you use Tokio's select, you wont need pin_mut anymore at all)
  • I generally recommend avoiding try_for_each and for_each for your loops. Just use a completely ordinary loop that calls next in each iteration. Real loops are much more flexible and you avoid the future::ok(()) sillyness. (There's an example of this in the actor blog post)
  • You should avoid unbounded channels. They can easily result in unbounded memory usage. In telnet-chat, I actually use try_send in some places, killing the client if it can't keep up.
  • Code like this becomes a lot easier to read if you introduce more functions and modules. Wrap stuff in structs!
  • One interesting way to implement AI players is to write a function that uses channels in the exact same way as what handle_connection does, but internally uses an AI rather than a TCP socket. It looks exactly like a real player to the rest of the code, since the rest of the code is just talking to whatever is on the other end of the channel. This way, you can isolate the AI code to its own part of the codebase.
  • If you want to gracefully support disconnects, I would not do it like this. I would give the players an ID and let them specify their ID when they connect. The router task would replace their channel in its map when they reconnect. The router task can check for old disconnected clients every 10 minutes or so and remove them. The connection task should die immediately.
8 Likes

@alice I'm still digesting everything in your reply but just wanted to say thank you for such an amazing response!

1 Like