Am I using tonic/tokio correctly? [using tonic for grpc streaming]

Hi rust experts! I was trying to build a game server with tonic and I am currently facing some problems. The main problems is that I am not aware a good way to pass message from the grpc server to my game state machine (process the game events).

To be more specifically, the game state machine will be running as an async task. When grpc server receive a call, it will be sending message to the the game state machine using tokio::mpsc channel. The game state machine then do some updates internally. However, it will also try to broadcast/send some events to the players (or a single player). Initially, I was thinking to use grpc stream to stream the events to players.

My current implementation is like below but it doesn't feel right (especially I need Arc<Mutex<Receiver<T>>> in the grpc "Handler". Am I doing something completely wrong? Maybe I should not use Grpc for this purpose (using raw TCP/UDP stream)? Have someone done similar things? Any comments would be very welcome. Thanks.

// server.rs
pub struct GameStateMachine{

    // channel to receive the player action events
    event_chan: mpsc::Receiver<PlayerActionEvent>,

    // broadcast channel to send public events to all players
    broadcast_chan: broadcast::Sender<GameEvents>,
}

impl GameStateMachine{
    pub async fn run(&mut self){
        // running in back group, will listen on the event_chan
        // and update state. StateMachine will send events using 
        // the broadcast Sender
    }

    pub fn new(
        event_chan: mpsc::Receiver<PlayerActionEvent>,
        broadcast_chan: broadcast::Sender<GameEvents>,
    ) -> Self{
        //...
    }

}

// grpc_server.rs
pub struct MyGrpcService{
    event_chan: mpsc::Sender<PlayerActionEvent>,

    broadcast_chan: broadcast::Receiver<GameEvents>,
}

/// Tonic grpc service 
#[tonic::async_trait]
impl PlayerAction for MyGrpcService{
    async fn send_action(
        &self,
        req: Request<ActionRequest>
    ) -> Result<Response<ActionResponse>, Status>{
        // ...
        // Forward the user action to game state machine using the tokio channel
        // this kind works, but not sure if this is the right way to do this
        self.event_chan.send(req.inner().into()).await.unwrap();

        //...
    }

    type GameEventStream = ReceiverStream<Result<EventResponse, Status>>;

    async fn game_event(
        &self,
        req: Request<GameStartRequest>,
    ) -> Result<Response<Self::GameEventStream>, Status> {
        // ...
        // NOTE: BELOW CODE doesn't compile, but that what I was hopping to do
        loop{
            // Not compile, but that is what I was hopping to achieve
            // I need &mut here to be able to execute recv()

            // I can use Arc<Mutex<Receiver<T>>> here, but this doesn't feel right
            // Is this a correct way to do HERE?
            let Some(event) = self.broadcast_events.recv().await {
                // if event is private and not to this player,
                //   -> skip sending the event to the stream
                // else
                //   -> send the event to the stream
            }
        }
    }
}

// main.rs
#[tokio::main]
async fn main() {

    let (event_chan_tx, event_chan_rx) = tokio::mpsc::channel(100);
    let (broadcast_tx, broadcast_rx) = tokio::broadcast::channel(100);

    let state_machine = GameStateMachine::new(event_chan_rx, broadcast_tx);
    tokio::spawn(async move {
        state_machine.run().await;
    });

    let grpc = MyGrpcService{
        event_chan: event_chan_tx,
        broadcast_events: broadcast_rx
    };

    tonic::transport::Server::builder()
        .add_service(PlayerActionServer::new(grpc))
        .serve("[::1]:50051".to_socket_addrs().unwrap().next().unwrap())
        .await
        .unwrap();

    Ok(())

}

The reason it wants you to use a mutex is that your channel is an mpsc channel (multi-producer, single-consumer), but you are calling it from a &self method, so it can be called in parallel which would lead to multiple consumers.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.