Hi everyone !
In my app I faced with deadlock. My app is a game client and it's connect to some game server. I want to redirect the packet I read to my external logging server. So, I added extra queue mpsc::channel::<IncomingPacket>(BUFFER_SIZE)
and use it to redirect all incoming packets to the handle_external_write
task. My app connects to the login server and after auth process it connects to the world server.
When client reconnected, it replace its TcpStream
with new one. For this purpose I use Self::set_stream_halves
method:
async fn set_stream_halves(
stream: TcpStream,
reader: Arc<Mutex<Option<Reader>>>,
writer: Arc<Mutex<Option<Writer>>>,
session_key: Option<Vec<u8>>,
warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
) {
let (rx, tx) = stream.into_split();
match session_key {
Some(session_key) => {
*warden_crypt.lock().unwrap() = Some(WardenCrypt::new(&session_key));
// HERE IS DEADLOCK
*reader.lock().await = Some(
Reader::new(
rx,
Arc::clone(&warden_crypt),
true,
Some(Decryptor::new(&session_key)),
)
);
*writer.lock().await = Some(
Writer::new(
tx,
Arc::clone(&warden_crypt),
true,
Some(Encryptor::new(&session_key)),
)
);
}
None => {
*reader.lock().await = Some(
Reader::new(rx, Arc::new(SyncMutex::new(None)), false, None)
);
*writer.lock().await = Some(
Writer::new(tx, Arc::new(SyncMutex::new(None)), false, None)
);
}
}
}
the deadlock appears after I added this line to the handle_read
:
// this is the queue-sender for redirecting incoming packets to the handle_write_external task
incoming_sender.send(packet).await?;
and after I added the handle_write_external
task. Without this extra queue and without this extra task all works as expected.
In the handle_write_external
the incoming_receiver
hangs on .await
since there nothing was sent yet:
fn handle_external_write(
&mut self,
mut incoming_receiver: Receiver<IncomingPacket>,
mut signal_receiver: BroadcastReceiver<Signal>,
) -> Task {
tokio::spawn(async move {
loop {
tokio::select! {
_ = signal_receiver.recv() => {
break;
},
_ = incoming_receiver.recv() => {},
}
}
let mut stream = loop {
match TcpStream::connect("127.0.0.1:3788").await {
Ok(stream) => {
break stream;
}
Err(_) => {
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
};
// HANGS HERE
while let Some(incoming_packet) = incoming_receiver.recv().await {
let IncomingPacket { header: mut packet, body, .. } = incoming_packet;
packet.extend_from_slice(&body);
stream.write(&packet).await?;
}
Ok(())
})
}
This is the full example:
pub struct Client {
_reader: Arc<Mutex<Option<Reader>>>,
_writer: Arc<Mutex<Option<Writer>>>,
_warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
session: Arc<Mutex<Session>>,
data_storage: Arc<SyncMutex<DataStorage>>,
}
impl Client {
pub fn new(options: CreateOptions) -> Self {
Self {
_reader: Arc::new(Mutex::new(None)),
_writer: Arc::new(Mutex::new(None)),
_warden_crypt: Arc::new(SyncMutex::new(None)),
session: Arc::new(Mutex::new(Session::new())),
data_storage: options.data_storage
.unwrap_or_else(|| Arc::new(SyncMutex::new(DataStorage::default()))),
}
}
async fn connect_inner(host: &str, port: u16) -> Result<TcpStream, Error> {
let addr = format!("{}:{}", host, port);
match TcpStream::connect(&addr).await {
Ok(stream) => Ok(stream),
Err(err) => Err(err),
}
}
async fn set_stream_halves(
stream: TcpStream,
reader: Arc<Mutex<Option<Reader>>>,
writer: Arc<Mutex<Option<Writer>>>,
session_key: Option<Vec<u8>>,
warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
) {
let (rx, tx) = stream.into_split();
match session_key {
Some(session_key) => {
*warden_crypt.lock().unwrap() = Some(WardenCrypt::new(&session_key));
// THIS MUTEX IS LOCKED AFTER ConnectionRequest in handle_output task
*reader.lock().await = Some(
Reader::new(
rx,
Arc::clone(&warden_crypt),
true,
Some(Decryptor::new(&session_key)),
)
);
*writer.lock().await = Some(
Writer::new(
tx,
Arc::clone(&warden_crypt),
true,
Some(Encryptor::new(&session_key)),
)
);
}
None => {
*reader.lock().await = Some(
Reader::new(rx, Arc::new(SyncMutex::new(None)), false, None)
);
*writer.lock().await = Some(
Writer::new(tx, Arc::new(SyncMutex::new(None)), false, None)
);
}
}
}
pub async fn run(&mut self, options: RunOptions<'_>) -> anyhow::Result<()> {
let RunOptions { account, config_path, dotenv_path, external_features } = options;
let EnvConfig { host, port } = EnvConfig::new(
EnvConfigParams { dotenv_path }
)?;
let notify = Arc::new(Notify::new());
let (signal_sender, signal_receiver) = broadcast::<Signal>(1);
let (output_sender, output_receiver) = mpsc::channel::<OutgoingPacket>(BUFFER_SIZE);
// this queue I use to redirect the packets from handle_read to my logging server
let (incoming_sender, incoming_receiver) = mpsc::channel::<IncomingPacket>(BUFFER_SIZE);
let (query_sender, query_receiver) = broadcast::<HandlerOutput>(BUFFER_SIZE);
match Self::connect_inner(&host, port).await {
Ok(stream) => {
Self::set_stream_halves(
stream,
Arc::clone(&self._reader),
Arc::clone(&self._writer),
None,
Arc::clone(&self._warden_crypt),
).await;
self.session.lock().await.set_config(&host, account, config_path)?;
query_sender.broadcast(
HandlerOutput::SuccessMessage(
format!("Connected to {}:{}", host, port),
None,
)
).await?;
Ok(())
}
Err(err) => {
query_sender.broadcast(
HandlerOutput::ErrorMessage(format!("Cannot connect: {}", err), None)
).await?;
Err(err)
}
}?;
#[allow(unused_mut)]
let mut features: Vec<Box<dyn Feature>> = external_features;
// ...
for feature in &mut features {
feature.set_broadcast_channel(query_sender.clone(), query_receiver.clone());
}
let mut all_tasks = vec![];
for feature in features.iter_mut() {
all_tasks.extend(feature.get_tasks()?);
}
all_tasks.extend(vec![
self.handle_read(
signal_receiver.clone(),
query_sender.clone(),
incoming_sender.clone(),
notify.clone(),
features,
),
self.handle_output(
signal_sender.clone(),
output_sender.clone(),
query_sender.clone(),
query_receiver,
notify.clone(),
),
self.handle_write(
output_receiver,
query_sender,
),
self.handle_external_write(
incoming_receiver,
signal_receiver,
),
]);
join_all(all_tasks).await;
Ok(())
}
fn handle_read(
&mut self,
mut signal_receiver: BroadcastReceiver<Signal>,
query_sender: BroadcastSender<HandlerOutput>,
incoming_sender: Sender<IncomingPacket>,
notify: Arc<Notify>,
features: Vec<Box<dyn Feature>>,
) -> Task {
let reader = Arc::clone(&self._reader);
let session = Arc::clone(&self.session);
let data_storage = Arc::clone(&self.data_storage);
tokio::spawn(async move {
let mut realm_processors = vec![];
let mut processors = vec![];
let mut one_time_handler_maps = vec![];
let mut initial_processors = vec![];
for feature in features.into_iter() {
realm_processors.extend(feature.get_realm_processors());
processors.extend(feature.get_login_processors());
one_time_handler_maps.extend(feature.get_one_time_handler_maps());
initial_processors.extend(feature.get_initial_processors());
}
let mut realm_processors = Some(realm_processors);
let handler_list = initial_processors
.iter()
.flat_map(|processor| processor(Opcode::LOGIN_CHALLENGE as u16))
.collect::<ProcessorResult>();
Self::call_handlers(
handler_list, &query_sender, ¬ify, HandlerInput {
session: Arc::clone(&session),
data: vec![],
data_storage: Arc::clone(&data_storage),
opcode: Opcode::LOGIN_CHALLENGE as u16,
},
).await?;
loop {
tokio::select! {
_ = signal_receiver.recv() => {
processors = realm_processors.take().unwrap();
},
result = Self::read_packet(reader.clone()) => {
match result {
Ok(packet) => {
let IncomingPacket { opcode, body, .. } = packet.clone();
let input = HandlerInput {
session: Arc::clone(&session),
data: body,
data_storage: Arc::clone(&data_storage),
opcode,
};
let mut handler_list = processors
.iter()
.flat_map(|processor| processor(opcode))
.collect::<ProcessorResult>();
for map in one_time_handler_maps.iter_mut() {
if let Some(handlers) = map.remove(&opcode) {
handler_list.extend(handlers)
}
}
// in this case we just show the raw packet (hex)
if handler_list.is_empty() {
let opcode_name = Opcode::get_opcode_name(
input.opcode as u32
).unwrap_or(format!("Unknown opcode: {}", input.opcode));
query_sender.broadcast(HandlerOutput::ResponseMessage(
opcode_name,
Some(encode_hex(&input.data)),
)).await?;
}
Self::call_handlers(
handler_list, &query_sender, ¬ify, input
).await?;
incoming_sender.send(packet).await?;
},
Err(err) => {
// ...
}
}
},
}
}
})
}
fn handle_output(
&mut self,
signal_sender: BroadcastSender<Signal>,
output_sender: Sender<OutgoingPacket>,
query_sender: BroadcastSender<HandlerOutput>,
mut query_receiver: BroadcastReceiver<HandlerOutput>,
notify: Arc<Notify>,
) -> Task {
let session = Arc::clone(&self.session);
let reader = Arc::clone(&self._reader);
let writer = Arc::clone(&self._writer);
let warden_crypt = Arc::clone(&self._warden_crypt);
tokio::spawn(async move {
loop {
let result = query_receiver.recv().await;
match result {
Ok(output) => {
match output {
// ...
HandlerOutput::ConnectionRequest(host, port) => {
match Self::connect_inner(&host, port).await {
Ok(stream) => {
signal_sender.broadcast(Signal::Reconnect).await?;
let session_key = {
let guard = session.lock().await;
let srp = guard.srp.as_ref().unwrap();
srp.session_key.to_vec()
};
// THIS TASK HANGS HERE
Self::set_stream_halves(
stream,
Arc::clone(&reader),
Arc::clone(&writer),
Some(session_key.clone()),
Arc::clone(&warden_crypt),
).await;
// ...
}
Err(err) => {
// ...
}
}
}
// ...
_ => {}
};
}
Err(err) => {
// ...
}
};
}
Ok(())
})
}
fn handle_external_write(
&mut self,
mut incoming_receiver: Receiver<IncomingPacket>,
mut signal_receiver: BroadcastReceiver<Signal>,
) -> Task {
tokio::spawn(async move {
loop {
tokio::select! {
_ = signal_receiver.recv() => {
break;
},
_ = incoming_receiver.recv() => {},
}
}
let mut stream = loop {
match TcpStream::connect("127.0.0.1:3788").await {
Ok(stream) => {
break stream;
}
Err(_) => {
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
};
// THIS TASK HANGS HERE
while let Some(incoming_packet) = incoming_receiver.recv().await {
let IncomingPacket { header: mut packet, body, .. } = incoming_packet;
packet.extend_from_slice(&body);
stream.write(&packet).await?;
}
Ok(())
})
}
async fn read_packet(reader: Arc<Mutex<Option<Reader>>>) -> anyhow::Result<IncomingPacket> {
reader.lock().await.as_mut().unwrap().read().await
}
async fn write_packet(
writer: Arc<Mutex<Option<Writer>>>,
packet: &mut OutgoingPacket,
) -> anyhow::Result<usize> {
writer.lock().await.as_mut().unwrap().write(packet).await
}
}
Could somebody explain what I did wrong ?