Tokio async read/write tasks not work without sleep (client just frozen)

I have a weird error, I am even not sure what can be the reason of that. In short, read/write methods in my TCP client not work without sleep at the end of the loop. Client just stay frozen, seems like one of .lock() not unlocked.

So, this is my code:

const READ_TIMEOUT: u64 = 250;
const WRITE_TIMEOUT: u64 = 1;

pub struct Reader {
    stream: OwnedReadHalf,
    // ...
}

pub struct Writer {
    stream: OwnedWriteHalf,
    // ...
}

// initial connection
pub async fn connect(&mut self, host: &str, port: u16) -> Result<(), Error> {
	// ...

	return match Self::connect_inner(host, port).await {
		Ok(stream) => {
			Self::set_stream_halves(stream, &self._reader, &self._writer).await;
			// ...
		},
		Err(err) => {
			// ...
		},
	}
}

async fn connect_inner(host: &str, port: u16) -> Result<TcpStream, Error> {
	let addr = format!("{}:{}", host, port);
	match TcpStream::connect(&addr).await {
		Ok(stream) => Ok(stream),
		Err(err) => Err(err),
	}
}

async fn set_stream_halves(
	stream: TcpStream,
	reader: &Arc<Mutex<Option<Reader>>>,
	writer: &Arc<Mutex<Option<Writer>>>
) {
	let (rx, tx) = stream.into_split();

	let mut reader = reader.lock().await;
	*reader = Some(Reader::new(rx));
	let mut writer = writer.lock().await;
	*writer = Some(Writer::new(tx));
}

// RECONNECT handles below
fn handle_queue(&mut self) -> JoinHandle<()> {
    // reader and writer are Arc<Mutex<Option<Reader or Writer>>>
	let reader = Arc::clone(&self._reader);
	let writer = Arc::clone(&self._writer);

	tokio::spawn(async move {
		loop {
			let packets = input_queue.lock().unwrap().pop_front();
			if packets.is_some() {
				for packet in packets.unwrap() {
					// ...

					for mut handler in handler_list {
						let response = handler.handle(&mut input).await;
						match response {
							Ok(output) => {
								match output {
									HandlerOutput::Data((opcode, header, body)) => {},
									HandlerOutput::ConnectionRequest(host, port) => {
										match Self::connect_inner(&host, port).await {
											Ok(stream) => {
												Self::set_stream_halves(
													stream, &reader, &writer
												).await;

												message_income.send_success_message(message);

											},
											Err(err) => {
												message_income.send_error_message(
													err.to_string()
												);
											}
										}
									},
									HandlerOutput::UpdateState(state) => {},
									HandlerOutput::Freeze => {},
									HandlerOutput::Void => {},
								};
							},
							Err(err) => {},
						};

						sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
					}
				}
			}
		}
	})
}

fn handle_write(&mut self) -> JoinHandle<()> {
	// ...
	let output_queue = Arc::clone(&self._output_queue);
	let writer = Arc::clone(&self._writer);

	tokio::spawn(async move {
		loop {
			match &mut *writer.lock().await {
				Some(writer) => {
					let packet = output_queue.lock().unwrap().pop_front();
					if packet.is_some() {
						message_income.send_success_message(format!("HAS PACKET"));
						let packet = packet.unwrap();
						if !packet.is_empty() {
							match writer.write(&packet).await {
								Ok(bytes_amount) => {},
								Err(err) => {}
							};
						}
					}
				},
				None => {},
			};

			sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
		}
	})
}

fn handle_read(&mut self) -> JoinHandle<()> {
	// ...
	let input_queue = Arc::clone(&self._input_queue);
	let reader = Arc::clone(&self._reader);
	let reconnect_signal_receiver = Arc::clone(&self._reconnect_signal_receiver);

	tokio::spawn(async move {
		loop {
			// get signal on reconnect
			let receiver = &mut *reconnect_signal_receiver.lock().await;
			tokio::select! {
				_ = receiver.recv() => {},
				Some(packets) = Self::read_packets(&reader) => {
					input_queue.lock().unwrap().push_back(packets);
				},
			};

			// if I comment this client just frozen after reconnect
			sleep(Duration::from_millis(READ_TIMEOUT)).await;
		}
	})
}

async fn read_packets(reader: &Arc<Mutex<Option<Reader>>>) -> Option<Vec<Vec<u8>>> {
	if let Some(reader) = &mut *reader.lock().await {
		if let Some(packets) = reader.read().await.ok() {
			if !packets.is_empty() {
				return Some(packets);
			}
		}
	}

	None
}

My questions:

  1. Could somebody help me to find what the reason of the issue ?
  2. Why sleep helps to make handle_read and handle_write working (timeout should be set for both)?
  3. sometimes handle_write sends packet in wrong order, what potentially can affect the order in my case ?

What do your read and write methods look like?

From code I posted above:

fn handle_write(&mut self) -> JoinHandle<()> {
	// ...
	let output_queue = Arc::clone(&self._output_queue);
	let writer = Arc::clone(&self._writer);

	tokio::spawn(async move {
		loop {
			match &mut *writer.lock().await {
				Some(writer) => {
					let packet = output_queue.lock().unwrap().pop_front();
					if packet.is_some() {
						message_income.send_success_message(format!("HAS PACKET"));
						let packet = packet.unwrap();
						if !packet.is_empty() {
							match writer.write(&packet).await {
								Ok(bytes_amount) => {},
								Err(err) => {}
							};
						}
					}
				},
				None => {},
			};

			sleep(Duration::from_millis(WRITE_TIMEOUT)).await;
		}
	})
}

fn handle_read(&mut self) -> JoinHandle<()> {
	// ...
	let input_queue = Arc::clone(&self._input_queue);
	let reader = Arc::clone(&self._reader);
	let reconnect_signal_receiver = Arc::clone(&self._reconnect_signal_receiver);

	tokio::spawn(async move {
		loop {
			// get signal on reconnect
			let receiver = &mut *reconnect_signal_receiver.lock().await;
			tokio::select! {
				_ = receiver.recv() => {},
				Some(packets) = Self::read_packets(&reader) => {
					input_queue.lock().unwrap().push_back(packets);
				},
			};

			// if I comment this client just frozen after reconnect
			sleep(Duration::from_millis(READ_TIMEOUT)).await;
		}
	})
}

async fn read_packets(reader: &Arc<Mutex<Option<Reader>>>) -> Option<Vec<Vec<u8>>> {
	if let Some(reader) = &mut *reader.lock().await {
		if let Some(packets) = reader.read().await.ok() {
			if !packets.is_empty() {
				return Some(packets);
			}
		}
	}

	None
}

Right but you're calling read and write on your custom structs. What's happening in those methods?

Well, I have handle_connection method:

pub async fn handle_connection(&mut self) {
	join_all(vec![
		// ...
		self.handle_queue(),
		self.handle_read(),
		self.handle_write(),
	]).await;
}

I call this method directly after client initialization:

#[tokio::main]
async fn main() {
    let mut client = Client::new();
    client.connect("127.0.0.1", 3724).await.unwrap();
    client.handle_connection().await;
}

Is this answers your question ? If not please let me know what else should I add

You have these structs, which you call read and write on. What are those methods doing?

1 Like

ah, I see. Well, this is code from Reader:

pub struct Reader {
    stream: OwnedReadHalf,
    decryptor: Option<Decryptor>,
    warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>
}

impl Reader {
    pub fn new(reader: OwnedReadHalf) -> Self {
        Self {
            stream: reader,
            decryptor: None,
            warden_crypt: Arc::new(SyncMutex::new(None))
        }
    }

    pub fn init(&mut self, session_key: &[u8], warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>) {
        self.decryptor = Some(Decryptor::new(session_key));
        self.warden_crypt = warden_crypt;
    }

    pub async fn read(&mut self) -> Result<Vec<Vec<u8>>, Error> {
        let mut buffer = [0u8; 65536];

        match self.stream.read(&mut buffer).await {
            Ok(bytes_count) => {
                let result = match self.decryptor.as_mut() {
                    Some(decryptor) => {
                        let raw_data = buffer[..bytes_count].to_vec();
                        let mut reader = Cursor::new(&raw_data);

                        let mut packets = Vec::new();
                        while reader.position() < (raw_data.len() as u64) {
                            let mut header = [0u8; INCOMING_HEADER_LENGTH as usize];
                            std::io::Read::read_exact(&mut reader, &mut header).unwrap();

                            let mut header_reader = Cursor::new(decryptor.decrypt(&header));
                            let size = ReadBytesExt::read_u16::<BigEndian>(&mut header_reader).unwrap();
                            let opcode = ReadBytesExt::read_u16::<LittleEndian>(&mut header_reader).unwrap();

                            let mut body = vec![0u8; (size - INCOMING_OPCODE_LENGTH) as usize];

                            match std::io::Read::read_exact(&mut reader, &mut body) {
                                Ok(_) => {},
                                Err(_) => {
                                    break;
                                }
                            }
                            if opcode == Opcode::SMSG_WARDEN_DATA {
                                body = self.warden_crypt.lock().unwrap().as_mut().unwrap().decrypt(&body);
                            }

                            let mut packet: Vec<u8> = Vec::new();
                            packet.append(&mut size.to_be_bytes().to_vec());
                            packet.append(&mut opcode.to_le_bytes().to_vec());
                            packet.append(&mut body);

                            packets.push(packet);
                        }

                        packets
                    },
                    _ => {
                        vec![buffer[..bytes_count].to_vec()]
                    },
                };

                Ok(result)
            },
            Err(err) => {},
        }
    }
}

This is code from Writer:

pub struct Writer {
    stream: OwnedWriteHalf,
    encryptor: Option<Encryptor>,
}

impl Writer {
    pub fn new(writer: OwnedWriteHalf) -> Self {
        Self {
            stream: writer,
            encryptor: None,
        }
    }

    pub fn init(&mut self, session_key: &[u8]) {
        self.encryptor = Some(Encryptor::new(session_key));
    }

    pub async fn write(&mut self, packet: &[u8]) -> Result<usize, Error> {
        let packet = match self.encryptor.as_mut() {
            Some(encryptor) => encryptor.encrypt(packet),
            _ => packet.to_vec(),
        };

        return match self.stream.write(&packet).await {
            Ok(bytes_amount) => {
                let _ = &self.stream.flush().await.unwrap();
                Ok(bytes_amount)
            },
            Err(err) => Err(err),
        }
    }
}

SyncMutex is std::sync::Mutex here

Oh I missed it since the types weren't included, but input_queue and output_queue don't appear to be async aware. So those loops that check them are going to busy wait, which can cause all kinds of problems. Is there a reason you aren't using a channel for that instead of a normal data structure?

To be honest, I didn't think about channel in this case. Let me try to refactor the code by using tokio::mpsc and I will post updates here.

@semicoleon during refactoring I got another error with some my data structures that used inside my client:

error: future cannot be sent between threads safely
   --> src\client\mod.rs:293:9
    |
293 |         tokio::spawn(async move {
    |         ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl futures::Future<Output = [async output]>`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, Session>`
note: future is not `Send` as this value is used across an await
   --> src\client\movement\ai\movement.rs:84:65
    |
38  |         let mut session = input.session.lock().unwrap();
    |             ----------- has type `std::sync::MutexGuard<'_, Session>` which is not `Send`
...
84  |                                 input.output_queue_sender.lock().await.send(packet).await.unwrap();
    |                                                                 ^^^^^^ await occurs here, with `mut session` maybe used later
...
117 |     }
    |     - `mut session` is later dropped here

and this:

error: future cannot be sent between threads safely
   --> src\client\mod.rs:293:9
    |
293 |         tokio::spawn(async move {
    |         ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl futures::Future<Output = [async output]>`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, DataStorage>`
note: future is not `Send` as this value is used across an await
   --> src\client\movement\ai\movement.rs:84:65
    |
39  |         let players_map = &mut input.data_storage.lock().unwrap().players_map;
    |                                ---------------------------------- has type `std::sync::MutexGuard<'_, DataStorage>` which is not `Send`
...
84  |                                 input.output_queue_sender.lock().await.send(packet).await.unwrap();
    |                                                                 ^^^^^^ await occurs here, with `input.data_storage.lock().unwrap()` maybe used later
...
117 |     }
    |     - `input.data_storage.lock().unwrap()` is later dropped here

This is about Session and DataStorage data structures that I defined to store some data and share between threads. This structures are wrapped in SyncMutex:

pub struct Client {
	// ...
    session: Arc<SyncMutex<Session>>,
    data_storage: Arc<SyncMutex<DataStorage>>,
}

I use this structures anywhere in the code. So, the question: does it make sense to wrap them into tokio::sync::Mutex instead ? Or probably need to find another way ?

Please let me know if I should share extra info.

Keeping a sync Mutex locked across an await point is basically never going to do what you want. You should probably be copying the data you need and then releasing the lock.

1 Like

to release the lock I need to call drop() on it ?

That would work, but it's often clearer to do something like

let data = {
    let guard = mutex.lock().unwrap();
    guard.data_field.clone()
};

That makes it obvious that you're just getting some data out of the Mutex and not holding on to the lock.

1 Like

is there similar approach for mutating data ? In case I need to update some field or call some method from shared data structure ?

For example, I have session.state_flags field and I need to call .set() method on it. For now I do this like below:

session.lock().unwrap().state_flags.set(StateFlags::SOME_FLAG, false);

Or I need to override session.follow_guid, for now I do this like below:

let mut session = input.session.lock().unwrap();
session.follow_guid = Some(guid);

Probably, wrapping such blocks in curvy brackets will do the trick ?

Yup that would work

1 Like

After refactoring client became frozen on reconnect, from what I got on debugging it freeze here:

async fn set_stream_halves(
        stream: TcpStream,
        reader: &Arc<Mutex<Option<Reader>>>,
        writer: &Arc<Mutex<Option<Writer>>>
) {
	let (rx, tx) = stream.into_split();

	let mut reader = reader.lock().await;
	*reader = Some(Reader::new(rx));
	// after this line frozen
	let mut writer = writer.lock().await;
	*writer = Some(Writer::new(tx));
}

Minimal full code for now is next:


pub struct Client {
    _reader: Arc<Mutex<Option<Reader>>>,
    _writer: Arc<Mutex<Option<Writer>>>,
    _warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
    // _input_queue: Arc<SyncMutex<VecDeque<Vec<Vec<u8>>>>>,
    _input_queue_sender: Arc<Mutex<mpsc::Sender<PacketList>>>,
    _input_queue_receiver: Arc<Mutex<mpsc::Receiver<PacketList>>>,
    // _output_queue: Arc<SyncMutex<VecDeque<Vec<u8>>>>,
    _output_queue_sender: Arc<Mutex<mpsc::Sender<Vec<u8>>>>,
    _output_queue_receiver: Arc<Mutex<mpsc::Receiver<Vec<u8>>>>,
    _income_message_pipe: Arc<SyncMutex<IncomeMessagePipe>>,
    _outcome_message_pipe: Arc<SyncMutex<OutcomeMessagePipe>>,
    _reconnect_signal_sender: Arc<Mutex<mpsc::Sender<bool>>>,
    _reconnect_signal_receiver: Arc<Mutex<mpsc::Receiver<bool>>>,

    session: Arc<SyncMutex<Session>>,
    data_storage: Arc<SyncMutex<DataStorage>>,
    client_flags: Arc<SyncMutex<ClientFlags>>,
}

async fn connect_inner(host: &str, port: u16) -> Result<TcpStream, Error> {
	let addr = format!("{}:{}", host, port);
	match TcpStream::connect(&addr).await {
		Ok(stream) => Ok(stream),
		Err(err) => Err(err),
	}
}

async fn set_stream_halves(
	stream: TcpStream,
	reader: &Arc<Mutex<Option<Reader>>>,
	writer: &Arc<Mutex<Option<Writer>>>
) {
	let (rx, tx) = stream.into_split();

	let mut reader = reader.lock().await;
	*reader = Some(Reader::new(rx));
	// after this line frozen
	let mut writer = writer.lock().await;
	*writer = Some(Writer::new(tx));
}

pub async fn handle_connection(&mut self) {
	// ...

	join_all(vec![
		// ...
		self.handle_queue(),
		self.handle_read(),
		self.handle_write(),
	]).await;
}

fn handle_queue(&mut self) -> JoinHandle<()> {
	let input_queue_receiver = Arc::clone(&self._input_queue_receiver);
	let output_queue_sender = Arc::clone(&self._output_queue_sender);
	let session = Arc::clone(&self.session);
	let reader = Arc::clone(&self._reader);
	let writer = Arc::clone(&self._writer);
	let warden_crypt = Arc::clone(&self._warden_crypt);
	let client_flags = Arc::clone(&self.client_flags);
	let data_storage = Arc::clone(&self.data_storage);
	let signal_sender = Arc::clone(&self._reconnect_signal_sender);

	let mut message_income = self._income_message_pipe.lock().unwrap().message_income.clone();
	let dialog_income = self._income_message_pipe.lock().unwrap().dialog_income.clone();

	tokio::spawn(async move {
		loop {
			// let packets = input_queue.lock().unwrap().pop_front();
			let packets = input_queue_receiver.lock().await.recv().await;
			if packets.is_some() {
				let connected_to_realm = client_flags.lock().unwrap().contains(ClientFlags::IS_CONNECTED_TO_REALM);

				for packet in packets.unwrap() {
					let processors = match connected_to_realm {
						true => Self::get_realm_processors(),
						false => Self::get_login_processors(),
					};

					let mut input = HandlerInput {
						session: Arc::clone(&session),
						// packet: size + opcode + body, need to parse separately
						data: Some(&packet),
						data_storage: Arc::clone(&data_storage),
						message_income: message_income.clone(),
						dialog_income: dialog_income.clone(),
					};

					let handler_list = processors
						.iter()
						.map(|processor| processor(&mut input))
						.flatten()
						.collect::<ProcessorResult>();

					for mut handler in handler_list {
						let response = handler.handle(&mut input).await;
						match response {
							Ok(output) => {
								match output {
									HandlerOutput::Data((opcode, header, body)) => {
										message_income.send_client_message(
											Opcode::get_opcode_name(opcode)
										);
										let body = match opcode {
											Opcode::CMSG_WARDEN_DATA => {
												warden_crypt.lock().unwrap().as_mut().unwrap().encrypt(&body)
											},
											_ => body,
										};

										let packet = [header, body].concat();
										// output_queue.lock().unwrap().push_back(packet);
										output_queue_sender.lock().await.send(packet).await.unwrap();
									},
									HandlerOutput::ConnectionRequest(host, port) => {
										match Self::connect_inner(&host, port).await {
											Ok(stream) => {
												signal_sender.lock().await.send(true).await.unwrap();

												// frozen here
												Self::set_stream_halves(
													stream, &reader, &writer
												).await;

												message_income.send_success_message(
													format!("Connected to {}:{}", host, port)
												);

											},
											Err(err) => {
												message_income.send_error_message(
													err.to_string()
												);
											}
										}
									},
									HandlerOutput::UpdateState(state) => {},
									HandlerOutput::Freeze => {},
									HandlerOutput::Void => {},
								};
							},
							Err(err) => {},
						};
					}
				}
			}
		}
	})
}

fn handle_write(&mut self) -> JoinHandle<()> {
	let output_queue_receiver = Arc::clone(&self._output_queue_receiver);
	let writer = Arc::clone(&self._writer);

	let mut message_income = self._income_message_pipe.lock().unwrap().message_income.clone();

	tokio::spawn(async move {
		loop {
			match &mut *writer.lock().await {
				Some(writer) => {
					let packet = output_queue_receiver.lock().await.recv().await;
					if packet.is_some() {
						// ...
					}
				},
				None => {},
			};
		}
	})
}

fn handle_read(&mut self) -> JoinHandle<()> {
	let input_queue_sender = Arc::clone(&self._input_queue_sender);
	let reader = Arc::clone(&self._reader);
	let reconnect_signal_receiver = Arc::clone(&self._reconnect_signal_receiver);

	let mut message_income = self._income_message_pipe.lock().unwrap().message_income.clone();

	tokio::spawn(async move {
		loop {
			// get signal on reconnect
			let receiver = &mut *reconnect_signal_receiver.lock().await;
			tokio::select! {
				_ = receiver.recv() => {},
				Some(packets) = Self::read_packets(&reader) => {
					// ...
					// input_queue.lock().unwrap().push_back(packets);
					input_queue_sender.lock().await.send(packets).await.unwrap();
				},
			};
		}
	})
}

handle_write is wrong, you want to check for packets first before you lock the writer. Otherwise the writer will stay locked while you wait for something to write, which matches your description of what's happening.

Is there a reason you put the mpsc halves in a Mutex? They're intended to be thread safe, so most of the time you don't need additional synchronization on them.

1 Like

For sender I think no reason, since I can just clone it, but receiver is what I cannot clone, so need to wrap it to pass into thread. Could you advice more appropriate approach to use with receiver halves ?

Well you could pass the receiver into the method instead of having it in the struct, I don't think you have things set up for spawning the tasks multiple times to work correctly anyway. I could be wrong about that though.

1 Like

Are you essentially using the channel as MPMC and not MPSC? In this case, you might have better luck with a dedicated implementation, like async-channel.

1 Like