Tokio mpsc blocked by smth in another thread (not clear what is blocker)

I trying to sync my reader/writer to make them wait for some changes from another thread. For this case I use tokio::mpsc (I also tried tokio::broadcast which from the first sight better fits this case but faced with Lagged issue and decided to try separate mpsc for each thread). But when I send two signals one-after-one I got only first signal received (seems like smth block second signal). This is my code:

pub struct Client {
	// ...
    _signal_sender1: mpsc::Sender<Signal>,
    _signal_receiver1: Arc<Mutex<mpsc::Receiver<Signal>>>,

    _signal_sender2: mpsc::Sender<Signal>,
    _signal_receiver2: Arc<Mutex<mpsc::Receiver<Signal>>>,
}

fn handle_queue(&mut self) -> JoinHandle<()> {
	let input_queue_receiver = Arc::clone(&self._input_queue_receiver);
	let output_queue_sender = self._output_queue_sender.clone();
	let session = Arc::clone(&self.session);
	let reader = Arc::clone(&self._reader);
	let writer = Arc::clone(&self._writer);
	let warden_crypt = Arc::clone(&self._warden_crypt);
	let client_flags = Arc::clone(&self.client_flags);
	let data_storage = Arc::clone(&self.data_storage);
	let signal_sender1 = self._signal_sender1.clone();
	let signal_sender2 = self._signal_sender2.clone();

	let mut message_income = self._income_message_pipe.lock().unwrap().message_income.clone();
	let dialog_income = self._income_message_pipe.lock().unwrap().dialog_income.clone();

	tokio::spawn(async move {
		loop {
			let packets = input_queue_receiver.lock().await.recv().await;
			if packets.is_some() {
				let processors = {
					let guard = client_flags.lock().unwrap();
					let connected_to_realm = guard.contains(ClientFlags::IS_CONNECTED_TO_REALM);

					match connected_to_realm {
						true => Self::get_realm_processors(),
						false => Self::get_login_processors(),
					}
				};

				for packet in packets.unwrap() {
					let mut input = HandlerInput {
						session: Arc::clone(&session),
						data: Some(&packet),
						data_storage: Arc::clone(&data_storage),
						message_income: message_income.clone(),
						dialog_income: dialog_income.clone(),
					};

					let handler_list = processors
						.iter()
						.map(|processor| processor(&mut input))
						.flatten()
						.collect::<ProcessorResult>();

					for mut handler in handler_list {
						let response = handler.handle(&mut input).await;
						match response {
							Ok(output) => {
								match output {
									HandlerOutput::Data((opcode, header, body)) => {},
									HandlerOutput::ConnectionRequest(host, port) => {
										match Self::connect_inner(&host, port).await {
											Ok(stream) => {
												// I should send signals before Self::set_stream_halves because of reader that blocks another thread otherwise
												signal_sender1.send(Signal::Reconnect).await.unwrap();
												signal_sender2.send(Signal::Reconnect).await.unwrap();

												Self::set_stream_halves(
													stream,
													Arc::clone(&reader),
													Arc::clone(&writer),
												).await;

												message_income.send_success_message(
													format!("Connected to {}:{}", host, port)
												);

											},
											Err(err) => {
												message_income.send_error_message(
													err.to_string()
												);
											}
										}
									},
									HandlerOutput::UpdateState(state) => {
										match state {
											State::SetEncryption(session_key) => {
												*warden_crypt.lock().unwrap() =
													Some(WardenCrypt::new(&session_key));

												if let Some(reader) = &mut *reader.lock().await
												{
													reader.init(
														&session_key,
														Arc::clone(&warden_crypt)
													);
													signal_sender1.send(Signal::NeedSync).await.unwrap();
												}

												if let Some(writer) = &mut *writer.lock().await
												{
													writer.init(&session_key);
													signal_sender2.send(Signal::NeedSync).await.unwrap();
												}
											},
											State::SetConnectedToRealm(is_authorized) => {},
										}
									},
									HandlerOutput::Freeze => {},
									HandlerOutput::Void => {},
								};
							},
							Err(err) => {},
						};
					}
				}
			}
		}
	})
}

fn handle_write(&mut self) -> JoinHandle<()> {
	let output_queue_receiver = Arc::clone(&self._output_queue_receiver);
	let writer = Arc::clone(&self._writer);
	let signal_receiver = Arc::clone(&self._signal_receiver2);

	let mut message_income = self._income_message_pipe.lock().unwrap().message_income.clone();

	tokio::spawn(async move {
		let mut need_sync = false;

		loop {
			let packet = output_queue_receiver.lock().await.recv().await;
			if packet.is_some() {
				let packet = packet.unwrap();
				if !packet.is_empty() {
					let receiver = &mut *signal_receiver.lock().await;

					tokio::select! {
						Some(signal) = receiver.recv() => {
							match signal {
								Signal::Reconnect => {
									// ...
								},
								Signal::NeedSync => {
									need_sync = true;
								},
							};
						},
						result = Self::write_packet(&writer, packet), if !need_sync => {
							match result {
								Ok(bytes_sent) => {
									message_income.send_debug_message(
										format!("{} bytes sent", bytes_sent)
									);
								},
								Err(err) => {
									message_income.send_client_message(err.to_string());
								}
							}
						},
						result = Self::check_encryptor(&writer), if need_sync => {
							if result {
								need_sync = false;
							}
						},
					}
				}
			}
		}
	})
}

fn handle_read(&mut self) -> JoinHandle<()> {
	let input_queue_sender = self._input_queue_sender.clone();
	let reader = Arc::clone(&self._reader);
	let signal_receiver = Arc::clone(&self._signal_receiver1);

	let mut message_income = self._income_message_pipe.lock().unwrap().message_income.clone();

	tokio::spawn(async move {
		let mut need_sync = false;

		loop {
			let receiver = &mut *signal_receiver.lock().await;
			tokio::select! {
				Some(signal) = receiver.recv() => {
					match signal {
						Signal::Reconnect => {
							// ...
						},
						Signal::NeedSync => {
							need_sync = true;
						},
					};
				},
				Some(packets) = Self::read_packets(&reader), if !need_sync => {
					input_queue_sender.send(packets).await.unwrap();
				},
				result = Self::check_decryptor(&reader), if need_sync => {
					if result {
						need_sync = false;
					}
				},
			};
		}
	})
}

async fn read_packets(reader: &Arc<Mutex<Option<Reader>>>) -> Option<Vec<Vec<u8>>> {
	if let Some(reader) = &mut *reader.lock().await {
		if let Some(packets) = reader.read().await.ok() {
			if !packets.is_empty() {
				return Some(packets);
			}
		}
	}

	None
}

async fn write_packet(writer: &Arc<Mutex<Option<Writer>>>, packet: Vec<u8>) -> Result<usize, Error> {
	let mut error = Error::new(ErrorKind::NotFound, "Not connected to TCP");

	match &mut *writer.lock().await {
		Some(writer) => {
			match writer.write(&packet).await {
				Ok(bytes_sent) => {
					return Ok(bytes_sent);
				},
				Err(err) => {
					error = err;
				}
			};
		},
		_ => {},
	};

	Err(error)
}

async fn check_decryptor(reader: &Arc<Mutex<Option<Reader>>>) -> bool {
	if let Some(reader) = &mut *reader.lock().await {
		return reader.has_decryptor();
	}

	false
}

async fn check_encryptor(reader: &Arc<Mutex<Option<Writer>>>) -> bool {
	if let Some(writer) = &mut *reader.lock().await {
		return writer.has_encryptor();
	}

	false
}

Sometimes I see that handle_write task receive signals, sometimes one signal received by handle_read and one by handle_write, but in most cases handle_read receive both and handle_write receive nothing.

Could somebody explain me this weird behavior ?

UPD: please check handle_queue for signal_sender1 and signal_sender2, I have two cases there where I use this senders.

It seems like you should split your Client struct into several structs. Consider reading this article: actors with Tokio.

1 Like

could you tell, does it mean Arc<Mutex<...>> will not solve the issue ?
(from what I got from the article you shared about actor access to the main struct fields it's still not clear about Arc<Mutex<...>>)

Don't put your channel inside Arc/Mutex.

1 Like

you mean channel halves right ? or, in my case, it's receiver only

Both sender and receiver.

1 Like

I have method where all my tasks wrapped into join_all:

join_all(vec![
	// ...
	self.handle_queue(),
	self.handle_read(),
	self.handle_write(),
]).await;

so to run tasks I run this method.

According to the pattern I should remove from join_all at least handle_read and handle_write doesn't it ?

Using join_all is like spawning. The general point is that each call should have its own struct instead of sharing self.

1 Like