The main thread (game loop) cannot see the updates from other thread?

Hi,

I'm currently working on implementing a two-players mode for my first project - a tetris game in the terminal.

In this mode, when player 1 starts the game, it will open a TCP listener and wait for the player 2 to connect. As soon as player 2 connects to the player 1's address, game will start:

fn main() -> Result<()> {
    let player_barrier = Arc::new(Barrier::new(2));
    let conn = open()?;
    let stdout = io::stdout();
    let args: Vec<String> = env::args().collect();
    if args.len() == 1 {
        let listener = TcpListener::bind("127.0.0.1:8080")?;
        println!("Server started. Waiting for the client to connect.");

        let (stream, _) = listener.accept()?;
        println!("Player 2 connected.");

        let mut stream_clone = stream.try_clone()?;
        let thread_barrier = Arc::new(Barrier::new(2));
        let thread_barrier_clone = thread_barrier.clone();
        let game = Arc::new(Mutex::new(Game::new(conn, Some(stream))));

        thread::spawn({
            let game_clone = Arc::clone(&game);
            move || {
                thread_barrier_clone.wait();
                receive_message(&mut stream_clone, &game_clone);
            }
        });

        thread_barrier.wait();

        let player_barrier_clone = player_barrier.clone();

        {
            let mut game_lock = game.lock().unwrap();
            game_lock.start(stdout);
        }

        player_barrier_clone.wait();
    } else {
        let server_address = &args[1];
        if server_address.len() != 0 {
            let stream = TcpStream::connect(server_address)?;

            let mut stream_clone = stream.try_clone()?;
            let thread_barrier = Arc::new(Barrier::new(2));
            let thread_barrier_clone = thread_barrier.clone();
            let game = Arc::new(Mutex::new(Game::new(conn, Some(stream))));

            thread::spawn({
                let game_clone = Arc::clone(&game);
                move || {
                    thread_barrier_clone.wait();
                    receive_message(&mut stream_clone, &game_clone);
                }
            });

            thread_barrier.wait();

            let player_barrier_clone = player_barrier.clone();

            {
                let mut game_lock = game.lock().unwrap();
                game_lock.start(stdout);
            }

            player_barrier_clone.wait();
        }
    }

    player_barrier.wait();

    Ok(())
}

During the gameplay, when one of players clear some lines, the number of cleared lines will be sent to other player via TCP stream:

        if num_filled_rows > 0 {
            let message = num_filled_rows.to_string();
            if let Some(stream) = &mut self.stream {
                send_message(stream, &message);
            }
        }
fn send_message(stream: &mut TcpStream, message: &str) {
    if let Err(err) = stream.write_all(message.as_bytes()) {
        eprintln!("Error sending message: {}", err);
    }
}

On the receiving side, I'm trying to add some random lines to the bottom of the play grid:

fn receive_message(stream: &mut TcpStream, game: &Arc<Mutex<Game>>) {
    let mut buffer = [0u8; 1];
    loop {
        match stream.read(&mut buffer) {
            Ok(n) if n > 0 => {
                let s = String::from_utf8_lossy(&buffer[0..n]);
                println!("Received: {}", s);

                let num_cleared_rows = s.parse().unwrap();
                let mut new_row = vec![LEFT_BORDER_CELL; 1]
                    .into_iter()
                    .chain(vec![I_CELL; PLAY_WIDTH])
                    .into_iter()
                    .chain(vec![RIGHT_BORDER_CELL; 1].into_iter())
                    .collect::<Vec<Cell>>();
                let mut rng = rand::thread_rng();
                let random_column = rng.gen_range(1..PLAY_WIDTH + 1);
                new_row[random_column] = EMPTY_CELL;

                let mut game_lock = game.lock().unwrap();
                for _ in 0..num_cleared_rows {
                    game_lock.play_grid.insert(PLAY_HEIGHT + 1, new_row.clone());
                }
                drop(game_lock);

                let mut stdout = io::stdout();
                game.lock().unwrap().render(&mut stdout);
            }
            Ok(_) | Err(_) => {
                break;
            }
        }
    }
}

However, I only see the "Received: <num_cleared_lines>" message first time clearing and the play grid on the receiving side is not updated.

If I do not wrap Game in a Arc<Mutex<Game>>, then I can see the "Received: ..." message every time a player clear some lines:

        let mut game = Game::new(conn, Some(stream));

        thread::spawn({
            move || {
                thread_barrier_clone.wait();
                receive_message(&mut stream_clone);
            }
        });

        thread_barrier.wait();

        let player_barrier_clone = player_barrier.clone();

        game.start(stdout);
fn receive_message(stream: &mut TcpStream) {
    let mut buffer = [0u8; 1];
    loop {
        match stream.read(&mut buffer) {
            Ok(n) if n > 0 => {
                let s = String::from_utf8_lossy(&buffer[0..n]);
                println!("Received: {}", s);
            }
            Ok(_) | Err(_) => {
                break;
            }
        }
    }
}

So, what is the reason, and how can I update the play grid in the main thread whenever a message is received in the receive_message thread?

You can find my complete code here.

Thanks,

Your main thread holds a lock forever here: tetris-tui/src/main.rs at multiplayer · quantonganh/tetris-tui (github.com) (same in the client branch). This lock cannot be released until the main loop (in handle_event) returns.

When receive_message attempts to acquire the lock, you get a deadlock.

1 Like

@parasyte Thank you for your help.

I solved it by using mpsc::channel to transfer data from receive_message thread to the main thread:

            let (sender, receiver): (Sender<String>, Receiver<String>) = channel();
            let mut game = Game::new(conn, Some(stream), Some(receiver));

            thread::spawn({
                move || {
                    thread_barrier_clone.wait();
                    receive_message(&mut stream_clone, sender);
                }
            });
fn receive_message(stream: &mut TcpStream, sender: Sender<String>) {
    let mut buffer = [0u8; 1];
    loop {
        match stream.read(&mut buffer) {
            Ok(n) if n > 0 => {
                let s = String::from_utf8_lossy(&buffer[0..n]);
                println!("Received: {}", s);

                if let Err(err) = sender.send(s.to_string()) {
                    eprintln!("Error sending message to the main thread: {}", err);
                }
            }
            Ok(_) | Err(_) => {
                break;
            }
        }
    }
}
                if let Some(receiver) = &self.receiver {
                    for s in receiver.try_iter() {
                        let mut rng = rand::thread_rng();

                        let cells = vec![I_CELL, O_CELL, T_CELL, S_CELL, Z_CELL, T_CELL, L_CELL];
                        let random_cell_index = rng.gen_range(0..cells.len());
                        let random_cell = cells[random_cell_index].clone();

                        let mut new_row = vec![LEFT_BORDER_CELL; 1]
                            .into_iter()
                            .chain(vec![random_cell; PLAY_WIDTH])
                            .into_iter()
                            .chain(vec![RIGHT_BORDER_CELL; 1].into_iter())
                            .collect::<Vec<Cell>>();
                        let random_column = rng.gen_range(1..=PLAY_WIDTH);
                        new_row[random_column] = EMPTY_CELL;

                        let num_cleared_rows = s.parse().unwrap();
                        for _ in 0..num_cleared_rows {
                            self.play_grid.remove(1);
                            self.play_grid.insert(PLAY_HEIGHT, new_row.clone());
                        }
                    }
                }

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.