I have TCP client which use bunch of tokio tasks. I need to read packet and put them into queue. I got result from queue in another task. And after certain packet I got deadlock (not sure why I got it only after certain packet). This is my tasks (where I faced with deadlock):
async fn handle_queue(&mut self) -> JoinHandle<()> {
let input_queue = Arc::clone(&self._input_queue);
let output_queue = Arc::clone(&self._output_queue);
let session = Arc::clone(&self.session);
let reader = Arc::clone(&self._reader);
let writer = Arc::clone(&self._writer);
// ...
tokio::spawn(async move {
loop {
let client_flags = &mut *client_flags.lock().await;
let connected_to_realm = client_flags.contains(ClientFlags::IS_CONNECTED_TO_REALM);
// deadlock when use input_queue
if let Some(packets) = input_queue.lock().await.pop_front() {
for packet in packets {
let processors = match connected_to_realm {
true => Self::get_realm_processors(),
false => Self::get_login_processors(),
};
let session = &mut *session.lock().await;
let data_storage = &mut *data_storage.lock().await;
let result_list = processors
.iter()
.map(|processor| {
processor(HandlerInput {
session,
data: Some(&packet),
data_storage,
output_sender: output_sender.clone(),
})
})
.flatten()
.collect::<Vec<HandlerResult>>();
for result in result_list {
match result {
Ok(output) => {
match output {
HandlerOutput::Data((opcode, header, body)) => {
// ...
output_queue.lock().await.push_back(packet);
},
HandlerOutput::ConnectionRequest(host, port) => {
// ...
},
HandlerOutput::UpdateState(state) => {
// ...
},
HandlerOutput::Void => {},
};
},
Err(err) => {
// ...
},
};
sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
}
}
} else {
sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
}
}
})
}
async fn handle_read(&mut self) -> JoinHandle<()> {
let input_queue = Arc::clone(&self._input_queue);
let reader = Arc::clone(&self._reader);
// ...
tokio::spawn(async move {
loop {
match &mut *reader.lock().await {
Some(reader) => {
if let Some(packets) = reader.read().await.ok() {
// because of deadlock packets cannot be put inside queue
input_queue.lock().await.push_back(packets);
}
},
None => {
// ...
},
};
sleep(Duration::from_millis(READ_TIMEOUT)).await;
}
})
}
this is how I run the tasks:
join_all(vec![
// ... rest tasks
self.handle_queue().await,
self.handle_write().await,
]).await;
could somebody share some hints how can I fix deadlock ?