How to fix deadlock when use mutex?

I have TCP client which use bunch of tokio tasks. I need to read packet and put them into queue. I got result from queue in another task. And after certain packet I got deadlock (not sure why I got it only after certain packet). This is my tasks (where I faced with deadlock):

async fn handle_queue(&mut self) -> JoinHandle<()> {
	let input_queue = Arc::clone(&self._input_queue);
	let output_queue = Arc::clone(&self._output_queue);
	let session = Arc::clone(&self.session);
	let reader = Arc::clone(&self._reader);
	let writer = Arc::clone(&self._writer);
	// ...

	tokio::spawn(async move {
		loop {
			let client_flags = &mut *client_flags.lock().await;
			let connected_to_realm = client_flags.contains(ClientFlags::IS_CONNECTED_TO_REALM);

            // deadlock when use input_queue
			if let Some(packets) = input_queue.lock().await.pop_front() {
				for packet in packets {
					let processors = match connected_to_realm {
						true => Self::get_realm_processors(),
						false => Self::get_login_processors(),
					};

					let session = &mut *session.lock().await;
					let data_storage = &mut *data_storage.lock().await;
					let result_list = processors
						.iter()
						.map(|processor| {
							processor(HandlerInput {
								session,
								data: Some(&packet),
								data_storage,
								output_sender: output_sender.clone(),
							})
						})
						.flatten()
						.collect::<Vec<HandlerResult>>();

					for result in result_list {
						match result {
							Ok(output) => {
								match output {
									HandlerOutput::Data((opcode, header, body)) => {
										// ...
										output_queue.lock().await.push_back(packet);
									},
									HandlerOutput::ConnectionRequest(host, port) => {
										// ...
									},
									HandlerOutput::UpdateState(state) => {
										// ...
									},
									HandlerOutput::Void => {},
								};
							},
							Err(err) => {
								// ...
							},
						};

						sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
					}
				}
			} else {
				sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
			}
		}
	})
}

async fn handle_read(&mut self) -> JoinHandle<()> {
	let input_queue = Arc::clone(&self._input_queue);
	let reader = Arc::clone(&self._reader);
	// ...

	tokio::spawn(async move {
		loop {
			match &mut *reader.lock().await {
				Some(reader) => {
					if let Some(packets) = reader.read().await.ok() {
                         // because of deadlock packets cannot be put inside queue
						input_queue.lock().await.push_back(packets);
					}
				},
				None => {
					// ...
				},
			};

			sleep(Duration::from_millis(READ_TIMEOUT)).await;
		}
	})
}

this is how I run the tasks:

join_all(vec![
    // ... rest tasks
    self.handle_queue().await,
    self.handle_write().await,
]).await;

could somebody share some hints how can I fix deadlock ?

input_queue.lock().await creates a temporary value holding the lock that only gets dropped at the end of the if let body (this is just the way if let, match and while let work). Try assigning input_queue.lock().await.pop_front() to a local variable before the if let.

You may also want to drop client_flags (the one inside the loop) to release that lock as early as possible.

3 Likes

the issue was with client_flags ! thank you very much !

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.