I am having trouble sharing an tokio broadcast channel with an axum handler. I want to pass a tx/rx to my handler. My use case is:
The user calls the /game route, with a listen request. Start the listen request with the broadcast rx, and wait for a message. The user calls the /game route with a command request. The command request then sends the message through the broadcast tx, then returns to the user with an OK. Then the original listen request gets the message, and returns some data to the user. I tried to do this several ways, but I always got the same result: I send the listen request, and it waits for the message to be sent. But calling the command doesn't even reach the handler. What could be the reason for this?
app.rs:
use std::sync::{Arc, RwLock};
use axum::body::Body;
use axum::extract::Request;
use axum::http::StatusCode;
use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::{middleware, Extension, Router};
use fred::prelude::*;
use http_body_util::BodyExt;
use scc::{HashMap, Queue};
use serde::Serialize;
use sqlx::postgres::PgPool;
use tokio::sync::broadcast::Receiver;
use tokio::sync::{broadcast, mpsc};
use tracing::trace;
use crate::channels::parse_xml_multiple;
use crate::router::{client_castle, countries, friends, game, help, mobil};
use crate::users::ServerCommand;
use crate::village::start::friendly_game::ActiveSepRoom;
// #[derive(Serialize, Clone, Debug)]
// pub struct FriendlyRooms(HashMap<i32, ActiveSepRoom>);
//
// impl FriendlyRooms {
// pub fn new() -> Self {
// FriendlyRooms(HashMap::new())
// }
//
// pub fn insert(&mut self, key: i32, val: ActiveSepRoom) {
// self.0.insert(key, val).unwrap();
// }
//
// pub fn get(&self, key: &i32) {
// self.0.get(key).unwrap();
// }
// }
pub type SharedState = HashMap<i32, SharedPlayerState>;
#[derive(Debug)]
struct PlayerState {
pub is_logged_in: RwLock<bool>,
pub is_listen_ready: RwLock<bool>,
pub server_command: RwLock<Option<ServerCommand>>,
pub listen_queue: Queue<String>,
}
#[derive(Clone, Debug)]
pub struct SharedPlayerState(Arc<PlayerState>);
impl SharedPlayerState {
fn new() -> Self {
let val = PlayerState {
is_logged_in: RwLock::new(false),
is_listen_ready: RwLock::new(false),
server_command: RwLock::new(None),
listen_queue: Queue::default(),
};
SharedPlayerState(Arc::new(val))
}
pub async fn get_login(&self) -> bool {
*self.0.is_logged_in.read().unwrap()
}
pub async fn get_listen_ready(&self) -> bool {
*self.0.is_listen_ready.read().unwrap()
}
pub async fn get_server_command(&self) -> Option<ServerCommand> {
self.0.server_command.read().unwrap().clone()
}
pub async fn get_next_listen(&self) -> Option<String> {
self.0.listen_queue.pop().map(|x| x.to_string())
}
pub async fn set_login(&self, val: bool) {
*self.0.is_logged_in.write().unwrap() = val;
}
pub async fn set_listen_ready(&self, val: bool) {
*self.0.is_listen_ready.write().unwrap() = val;
}
pub async fn set_server_command(&self, val: Option<ServerCommand>) {
*self.0.server_command.write().unwrap() = val;
}
pub async fn push_listen(&self, msg: String) {
self.0.listen_queue.push(msg);
}
}
#[derive(Debug)]
struct PlayerChannel<T: Clone> {
tx: broadcast::Sender<T>,
// rx: broadcast::Receiver<T>,
}
impl<T: Clone> PlayerChannel<T> {
fn new() -> Self {
let (tx, rx) = broadcast::channel(8);
PlayerChannel { tx }
}
}
#[derive(Clone)]
pub struct SharedPlayerChannel<T: Clone>(Arc<RwLock<PlayerChannel<T>>>);
impl<T: Clone> SharedPlayerChannel<T> {
pub fn new() -> Self {
SharedPlayerChannel(Arc::new(RwLock::new(PlayerChannel::new())))
}
pub async fn new_receiver(&self) -> Receiver<T> {
self.0.read().unwrap().tx.subscribe()
}
pub async fn send_message(&self, msg: T) -> Result<usize, broadcast::error::SendError<T>> {
self.0.read().unwrap().tx.send(msg)
}
pub async fn recv_message(mut receiver: Receiver<T>) -> Result<T, broadcast::error::RecvError> {
receiver.recv().await
}
pub async fn is_user_listening(&self) -> bool {
self.0.read().unwrap().tx.receiver_count() >= 1
}
}
pub struct App {
db: PgPool,
tmp_db: RedisPool,
}
impl App {
pub async fn new() -> Result<Self, anyhow::Error> {
let db = PgPool::connect(&dotenvy::var("DATABASE_URL").expect("DATABASE_URL not defined!"))
.await?;
sqlx::migrate!().run(&db).await?;
let config =
RedisConfig::from_url(&dotenvy::var("TMP_DB_URL").expect("TMP_DB_URL not defined!"))
.expect("Failed to create redis config from url");
let tmp_db = Builder::from_config(config)
.with_connection_config(|config| {
config.connection_timeout = std::time::Duration::from_secs(10);
})
// use exponential backoff, starting at 100 ms and doubling on each failed attempt
// up to 30 sec
.set_policy(ReconnectPolicy::new_exponential(0, 100, 30_000, 2))
.build_pool(8)
.expect("Failed to create redis pool");
tmp_db.init().await.expect("Failed to connect to redis");
Ok(Self { db, tmp_db })
}
pub async fn serve(self) -> Result<(), AppError> {
// todo use a middleware instead
const USER_ID: i32 = 1;
// let friendly_rooms: FriendlyRooms = FriendlyRooms(HashMap::new());
let shared_state: SharedState = HashMap::new();
let val = SharedPlayerState::new();
let player_channel: SharedPlayerChannel<String> = SharedPlayerChannel::new();
shared_state.insert(USER_ID, val).unwrap();
let app = Router::new()
.route("/mobil.php", post(mobil))
.route("/dat/help.json", get(help))
.route("/client_countries.php", get(countries))
.route("/client_friends.php", post(friends))
.route("/client_castle.php", get(client_castle))
// .route("/client_extdata.php", get(extdata))
.layer(Extension(self.db.clone()))
.layer(Extension(self.tmp_db.clone()));
let user_state = shared_state.get(&USER_ID).unwrap().get().clone();
let game_router = Router::new()
.route("/game", post(game))
.route_layer(middleware::from_fn(xml_header_extractor))
.layer(Extension(self.db.clone()))
.layer(Extension(self.tmp_db.clone()))
.layer(Extension(user_state.clone()))
// .layer(Extension(friendly_rooms))
.layer(Extension(player_channel.clone()));
let merged = app.merge(game_router);
let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
axum::serve(listener, merged.into_make_service())
.await
.map_err(|e| AppError::from(e))?;
Ok(())
}
}
async fn xml_header_extractor(request: Request, next: Next) -> Response {
println!("middleware called");
let (parts, body) = request.into_parts();
let bytes = body
.collect()
.await
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())
.unwrap()
.to_bytes();
let body = String::from_utf8_lossy(&bytes).to_string();
let mut lines: Vec<&str> = body.lines().collect();
println!("lines: {:?}", lines);
let xml_header_string = lines.remove(0);
let new_body = lines.get(0).unwrap().to_string();
let mut req = Request::from_parts(parts, Body::from(new_body));
let parsed_header = parse_xml_multiple(&xml_header_string).unwrap();
req.extensions_mut().insert(parsed_header);
let response = next.run(req).await;
response
}
#[derive(Debug)]
pub(crate) struct AppError(anyhow::Error);
impl IntoResponse for AppError {
fn into_response(self) -> Response {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Something went wrong: {}", self.0),
)
.into_response()
}
}
// This enables using `?` on functions that return `Result<_, anyhow::Error>` to turn them into
// `Result<_, AppError>`. That way you don't need to do that manually.
impl<E> From<E> for AppError
where
E: Into<anyhow::Error>,
{
fn from(err: E) -> Self {
Self(err.into())
}
}
router.rs:
use std::time::Duration;
use axum::{Extension, Json};
use fred::clients::RedisPool;
use scc::Queue;
use sqlx::PgPool;
use tokio::sync::{mpsc};
use tracing::{trace, warn};
use crate::app::{AppError, SharedPlayerChannel, SharedPlayerState};
use crate::cdn::countries::CountriesResponse;
use crate::channels::command::request::{CommandRoot, CommandType};
use crate::channels::command::response::CommandResponse;
use crate::channels::listen::request::ListenRoot;
use crate::channels::listen::response::ListenResponseType::VillageSetup;
use crate::channels::listen::response::{ListenResponse, ListenResponseHeader};
use crate::channels::BodyChannelType;
use crate::emulator::Emulator;
use crate::game_handlers::server_game_handler::ServerGameHandler;
use crate::menu::friend_list::external_data::ExternalFriendsRoot;
use crate::menu::friend_list::friends::FriendResponse;
use crate::menu::help::info_help::HelpResponse;
use crate::mobile::request::Mobile;
use crate::mobile::response::{LoginResponse, MobileResponse, PingResponse};
use crate::users::ServerCommand;
use crate::utils::{modified_xml_response, remove_root_tag};
use crate::village::castle::badges::CastleResponse;
use crate::village::setup::VillageSetupRoot;
use crate::village::start::friendly_game::ActiveSepRoom;
use crate::village::waithall::{GameMenuWaithall, Waithall};
pub async fn help() -> Json<HelpResponse> {
// todo find out how to use this
Json(HelpResponse::emulate())
}
pub async fn countries() -> Json<CountriesResponse> {
Json(CountriesResponse::emulate())
}
pub async fn friends() -> Json<FriendResponse> {
Json(FriendResponse::emulate())
}
pub async fn _extdata() -> Json<FriendResponse> {
Json(FriendResponse::emulate())
}
pub async fn mobil(Json(payload): Json<Mobile>) -> Json<MobileResponse> {
match payload {
Mobile::Ping(_) => Json(MobileResponse::Ping(PingResponse {
message: "pong".to_string(),
})),
Mobile::MobileLogin(_) => Json(MobileResponse::Login(LoginResponse::emulate())),
}
}
#[axum::debug_handler]
pub async fn game(
_db: Extension<PgPool>,
tmp_db: Extension<RedisPool>,
xml_header: Extension<BodyChannelType>,
player_state: Extension<SharedPlayerState>,
// friendly_rooms: Extension<FriendlyRooms>,
broadcast: Extension<SharedPlayerChannel<String>>,
body: String,
) -> Result<String, AppError> {
trace!("game handler called");
const GAME_ID: u32 = 1;
const PLAYER_ID: i32 = 1;
const PLAYER_NAME: &str = "xrtxn";
let body = format!("<ROOT>{}</ROOT>", body);
match xml_header.0 {
BodyChannelType::Command(comm) => {
let ser: CommandRoot = quick_xml::de::from_str(&body)?;
match ser.msg_type {
CommandType::Login(_) => {
// todo validate login
player_state.set_login(false).await;
player_state.set_listen_ready(false).await;
player_state.set_server_command(None).await;
Ok(modified_xml_response(&CommandResponse::ok(
PLAYER_ID, comm.mn,
))?)
}
CommandType::ChangeWaitHall(chw) => {
let msg;
match chw.waithall {
Waithall::Game => {
msg = quick_xml::se::to_string(&GameMenuWaithall::emulate())?;
}
Waithall::Village => {
msg = quick_xml::se::to_string(&VillageSetupRoot::emulate())?;
}
}
if broadcast.is_user_listening().await {
trace!("sending message to channel");
broadcast.send_message(msg).await.unwrap();
trace!("sent message to channel");
} else {
trace!("pushing to listen");
// player_state.push_listen(msg).await;
trace!("pushed to listen");
}
Ok(modified_xml_response(&CommandResponse::ok(
comm.client_id,
comm.mn,
))?)
}
CommandType::EnterGameLobby(_) => {
Ok(modified_xml_response(&CommandResponse::error())?)
}
CommandType::GetExternalData(_) => {
let msg = quick_xml::se::to_string(&ExternalFriendsRoot::emulate())?;
Ok(remove_root_tag(format!(
"{}\n{}",
quick_xml::se::to_string(&CommandResponse::ok(comm.client_id, comm.mn))?,
msg
)))
}
CommandType::ExitCurrentRoom(_) => {
warn!("Encountered stub ExitCurrentRoom, this response may work or may not");
Ok(modified_xml_response(&CommandResponse::ok(
comm.client_id,
comm.mn,
))?)
}
CommandType::AddFriendlyRoom(room) => {
// todo handle other cases
if room.opp1 == -1 && room.opp2 == -1 {
// todo get next number
let room_number = 1;
// friendly_rooms.insert(
// room_number,
// ActiveSepRoom::new_bot_room(PLAYER_ID, PLAYER_NAME),
// );
//
// let xml = quick_xml::se::to_string(&friendly_rooms.0)?;
// broadcast.send_message(xml).await.unwrap();
} else {
return Ok(modified_xml_response(&CommandResponse::error())?);
}
Ok(modified_xml_response(&CommandResponse::ok(
comm.client_id,
comm.mn,
))?)
}
CommandType::StartTriviador(_) => {
tokio::spawn(async move {
ServerGameHandler::new_friendly(&tmp_db, GAME_ID).await;
});
Ok(modified_xml_response(&CommandResponse::ok(
comm.client_id,
comm.mn,
))?)
}
CommandType::GamePlayerReady => {
// player_state.set_listen_ready(true).await;
Ok(modified_xml_response(&CommandResponse::ok(
comm.client_id,
comm.mn,
))?)
}
CommandType::SelectArea(area_selection) => {
player_state
.set_server_command(Some(ServerCommand::SelectArea(area_selection.area)))
.await;
Ok(modified_xml_response(&CommandResponse::ok(
comm.client_id,
comm.mn,
))?)
}
CommandType::QuestionAnswer(ans) => {
player_state
.set_server_command(Some(ServerCommand::QuestionAnswer(ans.answer)))
.await;
Ok(modified_xml_response(&CommandResponse::ok(
comm.client_id,
comm.mn,
))?)
}
CommandType::PlayerTipResponse(tip) => {
player_state
.set_server_command(Some(ServerCommand::TipAnswer(tip.tip)))
.await;
Ok(modified_xml_response(&CommandResponse::ok(
comm.client_id,
comm.mn,
))?)
}
}
}
BodyChannelType::Listen(lis) => {
let ser: ListenRoot = quick_xml::de::from_str(&body)?;
player_state.set_listen_ready(ser.listen.is_ready).await;
trace!("listen state is {}", player_state.get_listen_ready().await);
if !player_state.get_login().await {
player_state.set_login(true).await;
player_state.set_listen_ready(false).await;
return Ok(modified_xml_response(&ListenResponse::new(
ListenResponseHeader {
client_id: lis.client_id,
mn: lis.mn,
result: 0,
},
VillageSetup(VillageSetupRoot::emulate()),
))?);
}
let next_listen = player_state.get_next_listen().await;
if next_listen.is_none() {
trace!("subbed to recv message");
let rx = broadcast.new_receiver().await;
trace!("got new rx");
let msg = SharedPlayerChannel::recv_message(rx).await.unwrap();
trace!("recv'd message");
Ok(format!(
"{}\n{}",
quick_xml::se::to_string(&ListenResponseHeader {
client_id: PLAYER_ID,
mn: lis.mn,
result: 0,
})?,
remove_root_tag(msg)
))
} else {
Ok(format!(
"{}\n{}",
quick_xml::se::to_string(&ListenResponseHeader {
client_id: PLAYER_ID,
mn: lis.mn,
result: 0,
})?,
remove_root_tag(next_listen.unwrap())
))
}
}
}
}
pub async fn client_castle() -> Json<CastleResponse> {
Json(CastleResponse::emulate())
}
Here is the full repo if you want to check it out:
Reproduction comand in the github
I have been stuck here for days...