My task blocks all instances of mpsc::Sender (that I use in other tasks), so receiver never receive any output from it (never recv). This happens after I set is_casting
flag so cast_ai.manage()
possible to send first packet. I left some annotations in code below, please see it in comments
So, this is my task:
fn handle_ai(
&mut self,
output_sender: Sender<PacketData>,
message_income: MessageIncome
) -> JoinHandle<()> {
let session = Arc::clone(&self.session);
let data_storage = Arc::clone(&self.data_storage);
let mut cast_ai = CastAI::new();
tokio::spawn(async move {
loop {
let input = AIManagerInput {
session: Arc::clone(&session),
data_storage: Arc::clone(&data_storage),
output_sender: output_sender.clone(),
};
cast_ai.manage(input).await;
// when I add sleep(...).await here sender become unlocked
}
})
}
this is how CastAI
implemented:
pub struct AI {
state: StateFlags, // bitflags! instance
counter: u64,
}
impl AI {
pub fn new() -> Self {
Self {
state: StateFlags::NONE,
counter: 0,
}
}
pub async fn manage(&mut self, mut input: AIManagerInput) {
let state_flags = {
let guard = input.session.lock().unwrap();
guard.state_flags.clone()
};
let action_flags = {
let guard = input.session.lock().unwrap();
guard.action_flags.clone()
};
let is_in_world = state_flags.contains(StateFlags::IN_WORLD);
let is_casting = action_flags.contains(ActionFlags::IS_CASTING);
if is_in_world {
if is_casting {
if self.state.is_empty() {
let spells_map: HashSet<u32> = {
let guard = input.session.lock().unwrap();
guard.spells_map.clone()
};
let spell_id: u32 = self.get_spell_id(spells_map);
let packet_data = self.cast_spell(spell_id).unwrap();
input.output_sender.send(packet_data.clone()).await.unwrap();
// if I remove this line sender will work (it will send packets non-stop)
self.state.set(StateFlags::WAITING_FOR_RESULT, true);
}
}
}
}
}
This is full code:
pub struct Client {
_reader: Arc<Mutex<Option<Reader>>>,
_writer: Arc<Mutex<Option<Writer>>>,
_warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
_income_message_pipe: Arc<SyncMutex<IncomeMessagePipe>>,
_outcome_message_pipe: Arc<SyncMutex<OutcomeMessagePipe>>,
session: Arc<SyncMutex<Session>>,
data_storage: Arc<SyncMutex<DataStorage>>,
client_flags: Arc<SyncMutex<ClientFlags>>,
}
impl Client {
pub fn new() -> Self {
Self {
_reader: Arc::new(Mutex::new(None)),
_writer: Arc::new(Mutex::new(None)),
_warden_crypt: Arc::new(SyncMutex::new(None)),
_income_message_pipe: Arc::new(SyncMutex::new(IncomeMessagePipe::new())),
_outcome_message_pipe: Arc::new(SyncMutex::new(OutcomeMessagePipe::new())),
session: Arc::new(SyncMutex::new(Session::new())),
data_storage: Arc::new(SyncMutex::new(DataStorage::new())),
client_flags: Arc::new(SyncMutex::new(ClientFlags::NONE)),
}
}
pub async fn connect(&mut self, host: &str, port: u16) -> Result<(), Error> {
let mut income_pipe = self._income_message_pipe.lock().unwrap();
return match Self::connect_inner(host, port).await {
Ok(stream) => {
Self::set_stream_halves(
stream,
Arc::clone(&self._reader),
Arc::clone(&self._writer),
None,
Arc::clone(&self._warden_crypt),
).await;
match self.session.lock().unwrap().set_config(host) {
Ok(_) => {},
Err(err) => {
income_pipe.message_income.send_error_message(err.to_string());
}
}
income_pipe.message_income.send_success_message(
format!("Connected to {}:{}", host, port)
);
Ok(())
},
Err(err) => {
income_pipe.message_income.send_error_message(
format!("Cannot connect: {}", err.to_string())
);
Err(err)
},
}
}
async fn connect_inner(host: &str, port: u16) -> Result<TcpStream, Error> {
let addr = format!("{}:{}", host, port);
match TcpStream::connect(&addr).await {
Ok(stream) => Ok(stream),
Err(err) => Err(err),
}
}
async fn set_stream_halves(
stream: TcpStream,
reader: Arc<Mutex<Option<Reader>>>,
writer: Arc<Mutex<Option<Writer>>>,
session_key: Option<Vec<u8>>,
warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
) {
let (rx, tx) = stream.into_split();
if session_key.is_none() {
*reader.lock().await = Some(Reader::new(rx));
*writer.lock().await = Some(Writer::new(tx));
} else {
let session_key = session_key.unwrap();
*warden_crypt.lock().unwrap() = Some(WardenCrypt::new(&session_key));
let mut _reader = Reader::new(rx);
_reader.init(&session_key, Arc::clone(&warden_crypt));
*reader.lock().await = Some(_reader);
let mut _writer = Writer::new(tx);
_writer.init(&session_key);
*writer.lock().await = Some(_writer);
}
}
pub async fn run(&mut self) {
const BUFFER_SIZE: usize = 500;
let (signal_sender, signal_receiver) = mpsc::channel::<Signal>(1);
let (input_sender, input_receiver) = mpsc::channel::<Vec<u8>>(BUFFER_SIZE);
let (output_sender, output_receiver) = mpsc::channel::<PacketData>(BUFFER_SIZE);
let message_income = self._income_message_pipe.lock().unwrap().message_income.clone();
let dialog_income = self._income_message_pipe.lock().unwrap().dialog_income.clone();
let username = {
let guard = self.session.lock().unwrap();
let config = guard.get_config().unwrap();
config.connection_data.username.clone()
};
{
let mut guard = self._income_message_pipe.lock().unwrap();
guard.message_income.send_client_message(
format!("LOGIN_CHALLENGE as {}", &username)
);
}
output_sender.send(login_challenge(&username)).await.unwrap();
let (ws_sender, ws_receiver) = WSConnection::create().await;
let ws_sender = Arc::new(Mutex::new(ws_sender));
let ws_receiver = Arc::new(Mutex::new(ws_receiver));
{
let mut sender = ws_sender.lock().await;
let mut receiver = ws_receiver.lock().await;
let mut guard = self.data_storage.lock().unwrap();
let result = QueryRegistry::players(&mut *sender, &mut *receiver)
.get_players()
.get_all().await;
guard.players_map = match result {
Ok(players) => players,
_ => BTreeMap::new(),
};
}
join_all(vec![
self.handle_ui_render(),
self.handle_ui_input(),
self.handle_ui_output(),
self.handle_ai(output_sender.clone(), message_income.clone()),
self.handle_read(input_sender.clone(), signal_receiver, message_income.clone()),
self.handle_write(output_receiver, message_income.clone()),
self.handle_packet(
input_receiver,
signal_sender.clone(),
output_sender.clone(),
message_income.clone(),
dialog_income.clone(),
Arc::clone(&ws_sender),
Arc::clone(&ws_receiver),
),
]).await;
}
fn handle_ui_input(&mut self) -> JoinHandle<()> {
let event_income = self._income_message_pipe.lock().unwrap().event_income.clone();
tokio::task::spawn_blocking(move || {
let mut ui_input = UIInput::new(event_income);
loop {
ui_input.handle();
}
})
}
fn handle_ui_render(&mut self) -> JoinHandle<()> {
let income_message_pipe = Arc::clone(&self._income_message_pipe);
let dialog_outcome = self._outcome_message_pipe.lock().unwrap().dialog_outcome.clone();
let flag_outcome = self._outcome_message_pipe.lock().unwrap().flag_outcome.clone();
let client_flags = Arc::clone(&self.client_flags);
tokio::task::spawn_blocking(move || {
let mut ui = UI::new(
CrosstermBackend::new(std::io::stdout()),
UIOutputOptions {
dialog_outcome,
flag_outcome,
},
);
loop {
let client_flags = {
let guard = client_flags.lock().unwrap();
guard.clone()
};
ui.render(UIRenderOptions {
message: income_message_pipe.lock().unwrap().recv(),
client_flags: &client_flags,
});
}
})
}
fn handle_ui_output(&mut self) -> JoinHandle<()> {
let outcome_message_pipe = Arc::clone(&self._outcome_message_pipe);
let session = Arc::clone(&self.session);
let client_flags = Arc::clone(&self.client_flags);
tokio::task::spawn_blocking(move || {
loop {
if let Ok(message) = outcome_message_pipe.lock().unwrap().recv() {
let client_flags = &mut *client_flags.lock().unwrap();
let mut ui_output = UIOutput::new(Arc::clone(&session), client_flags);
ui_output.handle(message);
}
}
})
}
fn handle_ai(
&mut self,
output_sender: Sender<PacketData>,
message_income: MessageIncome
) -> JoinHandle<()> {
let session = Arc::clone(&self.session);
let data_storage = Arc::clone(&self.data_storage);
let mut cast_ai = CastAI::new();
tokio::spawn(async move {
loop {
let input = AIManagerInput {
session: Arc::clone(&session),
data_storage: Arc::clone(&data_storage),
output_sender: output_sender.clone(),
};
cast_ai.manage(input).await;
}
})
}
fn handle_packet(
&mut self,
mut input_receiver: Receiver<Vec<u8>>,
signal_sender: Sender<Signal>,
output_sender: Sender<PacketData>,
mut message_income: MessageIncome,
dialog_income: DialogIncome,
ws_sender: Arc<Mutex<WSSender<Compat<TcpStream>>>>,
ws_receiver: Arc<Mutex<WSReceiver<Compat<TcpStream>>>>,
) -> JoinHandle<()> {
let session = Arc::clone(&self.session);
let reader = Arc::clone(&self._reader);
let writer = Arc::clone(&self._writer);
let warden_crypt = Arc::clone(&self._warden_crypt);
let client_flags = Arc::clone(&self.client_flags);
let data_storage = Arc::clone(&self.data_storage);
let ws_sender = Arc::clone(&ws_sender);
let ws_receiver = Arc::clone(&ws_receiver);
tokio::spawn(async move {
loop {
if let Some(packet) = input_receiver.recv().await {
let processors = {
let guard = client_flags.lock().unwrap();
let connected_to_realm = guard.contains(ClientFlags::IS_CONNECTED_TO_REALM);
match connected_to_realm {
true => Self::get_realm_processors(),
false => Self::get_login_processors(),
}
};
let session_key = {
let guard = session.lock().unwrap();
guard.session_key.clone()
};
let mut input = HandlerInput {
session: Arc::clone(&session),
data: Some(&packet),
data_storage: Arc::clone(&data_storage),
message_income: message_income.clone(),
dialog_income: dialog_income.clone(),
ws_sender: Arc::clone(&ws_sender),
ws_receiver: Arc::clone(&ws_receiver),
};
let handler_list = processors
.iter()
.map(|processor| processor(&mut input))
.flatten()
.collect::<ProcessorResult>();
for mut handler in handler_list {
let response = handler.handle(&mut input).await;
match response {
Ok(output) => {
match output {
HandlerOutput::Data((opcode, header, body)) => {
let body = match opcode {
Opcode::CMSG_WARDEN_DATA => {
warden_crypt.lock().unwrap().as_mut().unwrap().encrypt(&body)
},
_ => body,
};
let packet = [header, body].concat();
output_sender.send((opcode, packet)).await.unwrap();
},
HandlerOutput::ConnectionRequest(host, port) => {
match Self::connect_inner(&host, port).await {
Ok(stream) => {
signal_sender.send(Signal::Reconnect).await.unwrap();
Self::set_stream_halves(
stream,
Arc::clone(&reader),
Arc::clone(&writer),
session_key.clone(),
Arc::clone(&warden_crypt),
).await;
message_income.send_success_message(
format!("Connected to {}:{}", host, port)
);
client_flags.lock().unwrap().set(
ClientFlags::IS_CONNECTED_TO_REALM,
true,
);
},
Err(err) => {
message_income.send_error_message(
err.to_string()
);
}
}
},
HandlerOutput::Freeze => {
{
client_flags.lock().unwrap().set(
ClientFlags::IN_FROZEN_MODE,
true
);
}
loop {
let frozen_mode = client_flags
.lock()
.unwrap()
.contains(ClientFlags::IN_FROZEN_MODE);
if !frozen_mode {
break;
}
}
},
HandlerOutput::Void => {},
};
},
Err(err) => {
message_income.send_error_message(err.to_string());
},
};
}
}
}
})
}
fn handle_write(
&mut self,
mut output_receiver: Receiver<PacketData>,
mut message_income: MessageIncome,
) -> JoinHandle<()> {
let writer = Arc::clone(&self._writer);
tokio::spawn(async move {
loop {
if let Some((opcode, packet)) = output_receiver.recv().await {
message_income.send_error_message(
format!("{} received: {}", opcode, packet.is_empty())
);
if !packet.is_empty() {
message_income.send_client_message(
Opcode::get_opcode_name(opcode)
);
match Self::write_packet(&writer, packet).await {
Ok(bytes_sent) => {
message_income.send_debug_message(
format!("{} bytes sent", bytes_sent)
);
},
Err(err) => {
message_income.send_client_message(err.to_string());
}
}
}
}
}
})
}
fn handle_read(
&mut self,
input_sender: Sender<Vec<u8>>,
mut signal_receiver: Receiver<Signal>,
mut message_income: MessageIncome,
) -> JoinHandle<()> {
let reader = Arc::clone(&self._reader);
tokio::spawn(async move {
loop {
tokio::select! {
signal = signal_receiver.recv() => {},
result = Self::read_packet(&reader) => {
match result {
Ok(packet) => {
input_sender.send(packet).await.unwrap();
},
Err(err) => {
message_income.send_error_message(err.to_string());
sleep(Duration::from_secs(1)).await;
}
}
},
};
}
})
}
async fn read_packet(reader: &Arc<Mutex<Option<Reader>>>) -> Result<Vec<u8>, Error> {
let mut error = Error::new(ErrorKind::NotFound, "Not connected to TCP");
if let Some(reader) = &mut *reader.lock().await {
let result = reader.read().await;
match result {
Ok(packet) => {
if !packet.is_empty() {
return Ok(packet);
}
}
Err(err) => {
error = err;
},
}
}
Err(error)
}
async fn write_packet(
writer: &Arc<Mutex<Option<Writer>>>,
packet: Vec<u8>
) -> Result<usize, Error> {
let mut error = Error::new(ErrorKind::NotFound, "Not connected to TCP");
match &mut *writer.lock().await {
Some(writer) => {
match writer.write(&packet).await {
Ok(bytes_sent) => {
return Ok(bytes_sent);
},
Err(err) => {
error = err;
}
};
},
_ => {},
};
Err(error)
}
fn get_login_processors() -> Vec<ProcessorFunction> {
return vec![
Box::new(AuthProcessor::process_input),
];
}
fn get_realm_processors() -> Vec<ProcessorFunction> {
return vec![
Box::new(AuraProcessor::process_input),
Box::new(CharactersProcessor::process_input),
Box::new(ChatProcessor::process_input),
Box::new(GameObjectProcessor::process_input),
Box::new(InventoryProcessor::process_input),
Box::new(MovementProcessor::process_input),
Box::new(PlayerProcessor::process_input),
Box::new(RealmProcessor::process_input),
Box::new(SpellProcessor::process_input),
Box::new(WardenProcessor::process_input),
];
}
}
Could somebody explain why sender blocks and how to fix this issue ? For me it's pretty unpredictable behavior.