Making a simple game with Warp and websockets

I should prefix this by saying I’m a complete nooblord when it comes to any kind of web and/or async programming.

I’m trying my hand at making a toy clone of the game Fibbage from the Jackbox series. If you don’t know how it works, the short version is it’s a trivia game where you can submit fake answers to fool the other players. My current idea is to have a backend written in Warp that people connect to via a web interface, with the rest of the communication then happening over websockets. I’m not worrying about the frontend for now, I’m just treating it as a black box that sends and receives messages.

I’m currently a bit stuck on how to structure the backend. I’ve been using the websocket chat example from Warp as a starting point. There the server provides a route that establishes a websocket connection, and on success, runs an async function user_connected that first adds the user to a HashMap and then broadcasts all messages they send to the other users. Thus, there is an async function running for each user until they disconnect.

I think this isn’t really the right model for my use case, because first of all, players shouldn’t be sending messages all over the place, and second, there has to be some sort of central game loop (ask a question → collect fake answers → send choices to players → …). The best idea I’ve had so far is this: the first player that connects can send a message to start the game, and when that happens, their instance of user_connected calls the main game loop as an async function.

(As a side issue, I would like the websocket route that allows new players to connect to the server to become unavailable at that point. I haven’t figured out how to make a route conditional like that yet.)

Am I on the right track at all? Am I overengineering this? I suspect that async is total overkill for this situation, but I picked Warp for no particular reason and that seems to be how Warp rolls, so I’m going with it.

Actually, aside from the big structural issue, I have a question about async coding style. Consider this function that collects the fake answers:

async fn collect_fibs(players: Players) -> HashMap<usize, String> {
    let mut players = players.lock().await;
    let mut fibs = HashMap::with_capacity(players.len());

    for (id, Player {rx, ..}) in players.iter_mut() {
        if let GameMessage::Fib(s) = serde_json::from_str(rx.next().await.unwrap().unwrap().to_str().unwrap()).unwrap() {
            fibs.insert(*id, s);
        }
    }
    
    fibs
}

Aside from all the unwrapping, which I realize isn’t very principled, is this how I iterate asynchronously over a collection or is there a more idiomatic way?

You definitely want a single task that handles the game state with channels for message passing. You should try to build your code with simple ownership semantics, which means avoiding mutexes if you can.

Mutexes can be a lot of trouble because you can only have one lock at a time. For example, if any other task tries to access players while your collect_fibs is running, it will have to wait until collect_fibs finishes after the loop.

You probably do not want to mix serde deserialization with your game loop.

2 Likes

I suppose that means that warp’s way of establishing the websocket connections (one task per player) is out, right?

By the logic of the program, no other function should need to access players at that time. Which raises the question of what purpose the mutex actually serves.

By that, do you mean that I should extract the de/serialization logic into a separate function that gets called in the loop?

What I'm describing below is what I believe is the best approach for this kind of stuff:

No, having a task per player is fine, but that task shouldn't be doing any game logic. It should be rather stupid:

  1. It should read and write bytes from the web socket.
  2. If there's any json parsing needed, do it here.
  3. Send any messages it receives to the game loop task using an mpsc channel.
  4. And when the game loop sends messages to the user, the user's task takes care of converting it to json and writing it to the web socket.

All the logic should happen in the game loop. The game loop should use mpsc channels to talk with the user's, which it does by sending a message to the user's task. The user's task then turns the message into json and writes it.

It serves no purpose. When only one task has access to a value, you do not need a mutex.

2 Likes

Thank you, that makes a lot of sense. One question, though: there are two channels between each player and the main loop, for both directions, right?

Yeah, that’s what I was getting at.

Thank you very much. This has made the design much clearer.

Yeah. Messages going to the game loop should probably be a single mpsc channel that all user tasks share (You can clone senders). Messages to the users should be a separate channel per user.

Regarding new users: You can send channels as messages in a channel. You probably want an enum for your messages so you can send different types of messages.

2 Likes

Great, I’ll try that!

1 Like

Good luck with it!

2 Likes

Thank you. I’ll likely come back with further questions :sweat_smile:

Good idea! I'll be happy to answer any questions, and if you feel the urge to use a mutex, I'll be happy to show you how to avoid that.

1 Like

Thanks to your help, I’ve already made quite a bit of progress and I think the basic structure is now in place. I have two design questions, though.

  1. Should the main loop be “monolithic”, i.e., should there be just one big loop that handles every possible event that might happen, or should I subdivide this more? For example, adding new players should not be possible past the point where the game is started. Do I enforce this through the structure of the main loop or by making sure that the RegisterPlayer event is never sent after the StartGame event?
  2. The individual player tasks mediate between the frontend and the main loop and hence have two incoming connections each. This means I can’t just loop over the events coming in on one of them. How do I organize this?

This is a difficult question. I think I would go for monolithic. You can make the game state an enum too, and have separate functions for each game state to split it up more. You can just kill connections when they send invalid messages, but watch out for race conditions where a client sends a lobby-only message just before the game starts.

The Tokio crate provides a select! macro for this kind of thing. It can be a bit difficult to use, but it's hard for me to give an example because I don't know how your websocket read code looks.

Monolithic does look like the much simpler solution. I’ll try that first. The question remains how I can selectively refuse connections after a certain game state has been reached. Surely this involves some kind of global mutable state.

I’ll have a look at it, thanks.

One option is to just immediately kill them if they arrive at the wrong time. You can do this by dropping the sender for the task's channel, because this will inform the task that the sender has been dropped, which it can react to by shutting itself down. (The .recv() function returns None when this happens)

Alternatively you can use a channel to tell warp to shut down the accept loop, if warp allows that kind of stuff. (I don't know if you can do that while keeping existing connections open)
Finally you could consider putting an AtomicBool in an Arc.

So the player’s task notifies the main loop that it would like to play and the main task just immediately terminates the connection? That makes sense. I suppose it could also send a refusal message back.

I’ll have to investigate whether warp allows me to disable a route based on runtime changes. If I can’t figure it out, I’ll fall back on the previous suggestion.

Well, I’ve come quite a long way. Here’s my main.rs. Note that the "frontend" is currently just a textbox that you can use to send JSON messages to the host.

use futures::channel::mpsc::{unbounded, UnboundedReceiver as Receiver, UnboundedSender as Sender};
use futures::channel::oneshot;
use futures::{future::ready, SinkExt, StreamExt};
use rand::prelude::*;
use rand::rngs::SmallRng;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tokio::select;
use warp::ws::{Message as WsMessage, WebSocket};
use warp::Filter;

type PlayerId = usize;
type Players = BTreeMap<PlayerId, Player>;
type Score = u16;

mod msg {
    use super::*;
    use serde::{Deserialize, Serialize};

    #[derive(Debug, Clone)]
    pub enum FromGame {
        SetId(PlayerId),
        PlayerDisconnected(String),
        PlayerConnected(String),
        Players(Vec<String>),
        RoundStarted(usize, String),
        PresentChoices(Vec<String>),
        PointsAwarded(BTreeMap<usize, ScoreReport>),
        GuessedCorrectAnswer,
    }

    pub enum ToGame {
        JoinRequest(oneshot::Sender<JoinResponse>),
        RegisterPlayer(String, Sender<FromGame>),
        StartGame,
        PlayerDisconnected(PlayerId),
        SubmitFib(PlayerId, String),
        SubmitAnswer(PlayerId, String),
    }

    #[derive(Serialize, Deserialize, Debug, Clone)]
    pub enum FromPlayer {
        SetName(String),
        SubmitFib(String),
        SubmitAnswer(String),
        StartGame,
    }

    #[derive(Debug, Clone, Serialize, Deserialize)]
    pub enum ToPlayer {
        Prompt(String),
        PlayerDisconnected(String),
        PlayerConnected(String),
        Players(Vec<String>),
        RoundStarted(usize, String),
        UnrecognizedMessage(String),
        GameRunning,
        JoinRequestAccepted,
        PresentChoices(Vec<String>),
        PointsAwarded(BTreeMap<usize, ScoreReport>),
        GuessedCorrectAnswer,
    }

    #[derive(Debug, Clone, Copy)]
    pub enum JoinResponse {
        Accepted,
        GameRunning,
    }
}
struct Player {
    name: String,
    score: Score,
    tx: Sender<msg::FromGame>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScoreReport {
    name: String,
    fooled: Vec<(usize, String)>,
    fib: String,
    correct: bool,
    points_awarded: Score,
    previous_total: Score,
}

impl ScoreReport {
    fn new(player: &Player, fib: String) -> Self {
        ScoreReport {
            name: player.name.clone(),
            fooled: Vec::new(),
            fib: fib.clone(),
            correct: false,
            points_awarded: 0,
            previous_total: player.score,
        }
    }
}

#[derive(Clone, Debug, Deserialize)]
struct Prompt {
    prompt: String,
    answer: String,
}

struct Game {
    inbox: Receiver<msg::ToGame>,
    running: bool,
    current_id: usize,
    players: Players,
    round: usize,
    prompt: Option<Prompt>,
    prompts: Vec<Prompt>,
    fibs: BTreeMap<String, Vec<PlayerId>>,
    choices: Vec<String>,
    answers: BTreeMap<usize, String>,
    rng: SmallRng,
}

impl Game {
    fn new(inbox: Receiver<msg::ToGame>) -> Self {
        let prompts =
            serde_json::from_str(&std::fs::read_to_string("prompts.json").expect("Reading prompts.json"))
                .expect("Parsing prompts");
        Game {
            inbox,
            running: false,
            current_id: 1,
            players: BTreeMap::new(),
            round: 0,
            prompt: None,
            prompts,
            fibs: BTreeMap::new(),
            choices: Vec::new(),
            answers: BTreeMap::new(),
            rng: SmallRng::from_entropy(),
        }
    }

    async fn broadcast(&mut self, m: msg::FromGame) {
        for Player { tx, .. } in self.players.values_mut() {
            tx.send(m.clone())
                .await
                .expect("Sending message to player loop");
        }
    }

    async fn new_round(&mut self) {
        self.round += 1;
        self.fetch_prompt();
        let prompt = self
            .prompt
            .as_ref()
            .expect("Prompt should be loaded")
            .clone();
        self.fibs.clear();
        self.choices.clear();
        self.answers.clear();
        self.broadcast(msg::FromGame::RoundStarted(
            self.round,
            prompt.prompt.clone(),
        ))
        .await;
    }

    fn fetch_prompt(&mut self) {
        let n = self.rng.next_u32() as usize % self.prompts.len();
        self.prompt = Some(self.prompts.remove(n));
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = unbounded();
    let tx = warp::any().map(move || tx.clone());
    tokio::task::spawn(main_loop(rx));

    // GET /fibbagews -> websocket upgrade
    let game = warp::path("fibbagews").and(warp::ws()).and(tx).map(
        move |ws: warp::ws::Ws, tx: Sender<msg::ToGame>| {
            ws.on_upgrade(|socket| player_loop(socket, tx))
        },
    );

    // GET /fibbage -> index html
    let index = warp::path("fibbage").map(|| warp::reply::html(INDEX_HTML));

    let routes = index.or(game);

    warp::serve(routes).run(([127, 0, 0, 1], 3000)).await;
}

async fn main_loop(rx: Receiver<msg::ToGame>) {
    let mut game = Game::new(rx);
    while let Some(msg) = game.inbox.next().await {
        match msg {
            msg::ToGame::JoinRequest(tx) => {
                if game.running {
                    tx.send(msg::JoinResponse::GameRunning)
                        .expect("Sending response to join request");
                } else {
                    tx.send(msg::JoinResponse::Accepted)
                        .expect("Sending response to join request");
                }
            }

            msg::ToGame::RegisterPlayer(name, mut tx) => {
                if game.running {
                    continue;
                }

                // Tell this player what their number is and who the other players are
                tx.send(msg::FromGame::SetId(game.current_id))
                    .await
                    .expect("Sending message to player loop");
                let player_list: Vec<String> =
                    game.players.values().map(|p| p.name.clone()).collect();
                tx.send(msg::FromGame::Players(player_list))
                    .await
                    .expect("Sending message to player loop");

                // Notify other players

                game.broadcast(msg::FromGame::PlayerConnected(name.clone()))
                    .await;

                let player = Player {
                    name: name.clone(),
                    tx,
                    score: 0,
                };

                game.players.insert(game.current_id, player);
                game.current_id += 1;
            }

            msg::ToGame::PlayerDisconnected(id) => {
                if let Some(Player { name, .. }) = &game.players.get(&id) {
                    let name = name.clone();
                    game.players.remove(&id);
                    game.broadcast(msg::FromGame::PlayerDisconnected(name.clone()))
                        .await;
                }
            }

            msg::ToGame::StartGame => {
                if !game.running {
                    game.running = true;
                    game.new_round().await;
                }
            }

            msg::ToGame::SubmitFib(player, text) => {
                if let Some(Player { tx, .. }) = game.players.get_mut(&player) {
                    if game.fibs.values().any(|v| v.contains(&player)) {
                        // Player has already submitted a fib
                        continue;
                    }
                    let real_answer = &game
                        .prompt
                        .as_ref()
                        .expect("Prompt should exist when collecting fibs")
                        .answer;

                    if text == *real_answer {
                        tx.send(msg::FromGame::GuessedCorrectAnswer)
                            .await
                            .expect("Sending message to player loop");
                        continue;
                    }

                    game.fibs.entry(text).or_insert(Vec::new()).push(player);
                    // Check if we have received a fib from everybody
                    // ∀i ∈ Players ∃v ∈ Fibs. i submitted v
                    if game
                        .players
                        .keys()
                        .all(|i| game.fibs.values().any(|v| v.contains(&i)))
                    {
                        game.choices.push(real_answer.to_owned());
                        for fib in game.fibs.keys() {
                            game.choices.push(fib.clone());
                        }

                        game.choices.shuffle(&mut game.rng);
                        for (i, Player { tx, .. }) in game.players.iter_mut() {
                            let (submitted, _) = game
                                .fibs
                                .iter()
                                .find(|(_, v)| v.contains(&i))
                                .expect("Every player must have submitted a fib");
                            tx.send(msg::FromGame::PresentChoices(
                                game.choices
                                    .iter()
                                    .filter(|f| *f != submitted)
                                    .cloned()
                                    .collect(),
                            ))
                            .await
                            .expect("Sending message to player loop");
                        }
                    }
                }
            }

            msg::ToGame::SubmitAnswer(player, answer) => {
                if !game.players.contains_key(&player) || game.answers.contains_key(&player) {
                    // Either we don't know this player or they've already submitted an answer
                    continue;
                }
                game.answers.insert(player, answer);
                if game.players.keys().all(|i| game.answers.contains_key(&i)) {
                    // Award points
                    let mut scores = BTreeMap::new();
                    for (player, choice) in game.answers.iter() {
                        let (player_fib, _) = game
                            .fibs
                            .iter()
                            .find(|(_, v)| v.contains(&player))
                            .expect("Player must have submitted a fib");
                        match game.fibs.get(choice) {
                            None => {
                                // Player picked the correct answer
                                let mut entry = scores.entry(*player).or_insert(ScoreReport::new(
                                    &game.players[player],
                                    player_fib.clone(),
                                ));
                                entry.points_awarded += 200;
                                entry.correct = true;
                            }
                            Some(fibbers) => {
                                // Player picked a fib → all players who submitted it get points
                                for p in fibbers.iter() {
                                    debug_assert!(
                                        !fibbers.is_empty(),
                                        "If it's a fib someone must have submitted it"
                                    );
                                    let mut entry = scores.entry(*p).or_insert(ScoreReport::new(
                                        &game.players[p],
                                        choice.clone(),
                                    ));
                                    entry.points_awarded += 100;
                                    entry
                                        .fooled
                                        .push((*player, game.players[player].name.clone()))
                                }
                            }
                        }
                    }
                    game.broadcast(msg::FromGame::PointsAwarded(scores.clone()))
                        .await;
                    game.new_round().await;
                }
            }
        }
    }
}
async fn player_loop(ws: WebSocket, mut to_game: Sender<msg::ToGame>) {
    let (mut to_ws, mut from_ws) = {
        // Split the socket into a sender and receive of messages.
        let (to_ws1, from_ws1) = ws.split();

        let to_ws2 = to_ws1.with(|msg: msg::ToPlayer| {
            let res: Result<WsMessage, warp::Error> = Ok(WsMessage::text(
                serde_json::to_string(&msg).expect("Converting message to JSON"),
            ));
            ready(res)
        });

        (to_ws2, from_ws1)
    };

    let (greet_tx, greet_rx) = oneshot::channel();
    to_game
        .send(msg::ToGame::JoinRequest(greet_tx))
        .await
        .expect("Sending message to main loop");
    match greet_rx.await.expect("Receive answer to join request") {
        msg::JoinResponse::Accepted => {
            to_ws
                .send(msg::ToPlayer::JoinRequestAccepted)
                .await
                .expect("Sending message over WebSocket");
        }
        msg::JoinResponse::GameRunning => {
            to_ws
                .send(msg::ToPlayer::GameRunning)
                .await
                .expect("Sending message over WebSocket");
            return;
        }
    }

    let (to_me, mut from_game) = unbounded();

    let mut my_id = 0;

    loop {
        select! {
            msg = from_ws.next() => {
                let msg = match msg {
                    Some(msg) => match msg {
                        Ok(msg) => msg,
                        Err(_) => {
                            to_game.send(msg::ToGame::PlayerDisconnected(my_id)).await.expect("Sending message to main loop");
                            return;
                        }
                    }
                    None => {
                        to_game.send(msg::ToGame::PlayerDisconnected(my_id)).await.expect("Sending message to main loop");
                        return;
                    }
                };
                if msg.is_close() {
                    to_game.send(msg::ToGame::PlayerDisconnected(my_id)).await.expect("Sending message to main loop");
                    return;
                }
                let msg = msg.to_str().expect("Expecting a string message");
                match serde_json::from_str(msg) {
                    Ok(msg::FromPlayer::SetName(name)) => {
                        to_game.send(msg::ToGame::RegisterPlayer(name, to_me.clone())).await.expect("Sending message to main loop");
                    }

                    Ok(msg::FromPlayer::SubmitFib(text)) => {
                        to_game.send(msg::ToGame::SubmitFib(my_id, text)).await.expect("Sending message to main loop");
                    }

                    Ok(msg::FromPlayer::SubmitAnswer(number)) => {
                        to_game.send(msg::ToGame::SubmitAnswer(my_id, number)).await.expect("Sending message to main loop");
                    }

                    Ok(msg::FromPlayer::StartGame) => {
                        to_game.send(msg::ToGame::StartGame).await.expect("Sending message to main loop");
                    }

                   Err(_) => {
                       to_ws.send(msg::ToPlayer::UnrecognizedMessage(msg.to_owned())).await.expect("Sending message to main loop");
                   }
                }

            }
            msg = from_game.next() => {
                match msg.expect("Receiving message from main loop") {
                    msg::FromGame::SetId(id) => my_id = id,
                    msg::FromGame::PlayerDisconnected(name) => {
                        to_ws.send(msg::ToPlayer::PlayerDisconnected(name)).await.expect("Sending message over WebSocket");
                    }

                    msg::FromGame::PlayerConnected(name) => {
                        to_ws.send(msg::ToPlayer::PlayerConnected(name)).await.expect("Sending message over WebSocket");
                    }

                    msg::FromGame::Players(players) => {
                        to_ws.send(msg::ToPlayer::Players(players)).await.expect("Sending message over WebSocket");
                    }

                    msg::FromGame::RoundStarted(n, prompt) => {
                        to_ws.send(msg::ToPlayer::RoundStarted(n, prompt)).await.expect("Sending message over WebSocket");
                    }

                    msg::FromGame::PresentChoices(choices) => {
                        to_ws.send(msg::ToPlayer::PresentChoices(choices)).await.expect("Sending message over WebSocket");
                    }

                    msg::FromGame::GuessedCorrectAnswer => {
                        to_ws.send(msg::ToPlayer::GuessedCorrectAnswer).await.expect("Sending message over WebSocket");
                    }

                    msg::FromGame::PointsAwarded(report) => {
                        to_ws.send(msg::ToPlayer::PointsAwarded(report)).await.expect("Sending message over WebSocket");
                    }
                }
            }
        }
    }
}

static INDEX_HTML: &str = r#"
<!DOCTYPE html>
<html>
    <head>
        <title>Fibbage Test</title>
    </head>
    <body>
        <h1>Fibbage test interface</h1>
        <div id="game">
            <p><em>Connecting...</em></p>
        </div>
        <input type="text" id="text" />
        <button type="button" id="send">Send</button>
        <script type="text/javascript">
        var uri = 'wss://' + location.host + '/fibbagews';
        var ws = new WebSocket(uri);

        function message(data) {
            var line = document.createElement('p');
            line.innerHTML = data;
            game.appendChild(line);
        }

        ws.onopen = function() {
            game.innerHTML = "<p><em>Connected!</em></p>";
        }

        ws.onmessage = function(msg) {
            message('Received: ' + msg.data);
        };

        send.onclick = function() {
            var msg = text.value;
            ws.send(msg);
            text.value = '';

            message('Sent: ' + msg);
        };
        </script>
    </body>
</html>
"#;

It's possible I've way overengineered this, it's something I'm prone to. In any case I learned a ton. I'd be very grateful for any reviews or commentary.

I have not had a thorough look at it, but it seems reasonable. Generally I recommend to split it up into several files, and to try to split up your functions into several functions.

Additionally, I would try to do something about stuff like this:

let msg = match msg {
    Some(msg) => match msg {
        Ok(msg) => msg,
        Err(_) => {
            to_game.send(msg::ToGame::PlayerDisconnected(my_id)).await.expect("Sending message to main loop");
            return;
        }
    }
    None => {
        to_game.send(msg::ToGame::PlayerDisconnected(my_id)).await.expect("Sending message to main loop");
        return;
    }
};

For example, you might write it like this:

let msg = msg
    .ok_or(|| msg::ToGame::PlayerDisconnected(my_id))
    .and_then(|inner| {
        inner.map_err(|_| msg::ToGame::PlayerDisconnected(my_id))
    });

let msg = match msg {
    Ok(msg) => msg,
    Err(to_game_msg) => {
        to_game.send(to_game_msg)
            .await
            .expect("Main loop has shut down.");
        return;
    },
};

Whether this is simpler is a matter of taste, but I recommend trying to simplify it in some way. Note that the error you're expect-ing away is a reasonable way to detect graceful shutdown of the server.

I’ll see about splitting stuff up and reorganizing it a bit.

Do you mean the possible Err result of to_game.send(…).await? In that case, I should probably send a notification over the Websocket and then shut the player task down as well, right?

Yeah, that's the one I mean. You should have some shutdown strategy, at least.

On the subject of error handling, what’s a sensible amount of effort to invest into errors for a project of this size? I would just use anyhow and return boxed errors everywhere, but I still need some custom errors e.g. when ws.next().await is a None, because there’s no error there for me to pass on. Does it make sense to create an error enum with just one or two variants for these edge cases and then immediately put them in an anyhow::Error anyway?