Tokio BufReader hangs on await when wrap OwnedReadHalf

Hi community ! I have Reader that use tokio OwnedReadHalf and I want to wrap it into BufReader, but seems like I do smth wrong, because app hangs on self.stream.read_to_end(&mut packet).await?; (see code below):

pub struct Reader {
    stream: BufReader<OwnedReadHalf>,
    decryptor: Option<Decryptor>,
    warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>
}

impl Reader {
    pub fn new(reader: OwnedReadHalf) -> Self {
        Self {
            stream: BufReader::new(reader),
            decryptor: None,
            warden_crypt: Arc::new(SyncMutex::new(None))
        }
    }

    pub fn init(&mut self, session_key: &[u8], warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>) {
        self.decryptor = Some(Decryptor::new(session_key));
        self.warden_crypt = warden_crypt;
    }

    pub async fn read(&mut self) -> Result<Vec<Vec<u8>>, Error> {
        match self.decryptor.as_mut() {
            Some(decryptor) => {
                let mut packets = Vec::new();

                while !self.stream.buffer().is_empty() {
                    let mut header = vec![0u8; INCOMING_HEADER_LENGTH as usize];
                    self.stream.read_exact(&mut header).await?;

                    let mut reader = Cursor::new(decryptor.decrypt(&header));
                    let size = ReadBytesExt::read_u16::<BigEndian>(&mut reader)?;
                    let opcode = ReadBytesExt::read_u16::<LittleEndian>(&mut reader)?;

                    let mut body = vec![0u8; (size - INCOMING_OPCODE_LENGTH) as usize];
                    self.stream.read_exact(&mut body).await?;

                    if opcode == Opcode::SMSG_WARDEN_DATA {
                        let warden_crypt = &mut *self.warden_crypt.lock().unwrap();
                        body = warden_crypt.as_mut().unwrap().decrypt(&body);
                    }

                    let mut packet: Vec<u8> = Vec::new();
                    packet.append(&mut size.to_be_bytes().to_vec());
                    packet.append(&mut opcode.to_le_bytes().to_vec());
                    packet.append(&mut body);

                    packets.push(packet);
                }

                Ok(packets)
            },
            _ => {
                let mut packet = Vec::new();
                // app hangs here
                self.stream.read_to_end(&mut packet).await?;

                Ok(vec![packet])
            },
        }
    }
}

I suppose I missed smth so Reader cannot read anything from internal buffer. Could somebody help me to got where is the issue ?

Well, is the socket ever closed? The function does not return until the socket is closed.

no, socket is not closed. It should be keep alive. I use read() in loop.

When should the read end? The read_to_end method can only be used if you want to keep reading until the socket is closed.

I see. Well, probably read_to_end is not what should be used there. I just need to read whole packet that come from server. Probably would be better if I share the code that I used before:

pub fn read(&mut self) -> Result<Vec<Vec<u8>>, Error> {
	let mut buffer = [0u8; 65536];

	match self.stream.try_read(&mut buffer) {
		Ok(bytes_count) => {
			let result = match self.decryptor.as_mut() {
				Some(decryptor) => {
					let mut warden_crypt = self.warden_crypt.lock().unwrap();

					Self::parse_packets(
						buffer[..bytes_count].to_vec(),
						decryptor,
						warden_crypt.as_mut().unwrap(),
					)
				},
				_ => {
					vec![buffer[..bytes_count].to_vec()]
				},
			};

			Ok(result)
		},
		Err(err) => {
			// ...
		},
	}
}

so to get whole packet I read bytes_count bytes from buffer. I want to do smth like that with BufReader.

Also, I prefer not to use try_read, instead want to use async read.

How do you know how long the packets are? Do they have fixed length? Do they include a length header? Some other mechanism?

client connect to LoginServer first, packets from this server not contain size in its header, after client pass SRP authentication it reconnect to WorldServer and packets from this server contains size in header. So before I just relied on bytes_count value (but I expect that packet's length contains full width).

Also (probably this would be useful), WorldServer can send multiple packets split into one. So I read size, get size bytes from the packet, then read size again until buffer will be empty.

You cannot expect the size returned by read (or try_read) to have anything to do with the size of your packets.

1 Like

could you advice what can I do in this case ? I tried to use read_line instead, but got same result (stuck on await). I need to fix this part of code:

{
	let mut packet = Vec::new();
	// app hangs here
	self.stream.read_to_end(&mut packet).await?;

	Ok(vec![packet])
}

I want to read whole current buffer.

Well, I tried to do next:

{
	let buffer = self.stream.fill_buf().await?;
	let packet = buffer.to_vec();
	self.stream.consume(packet.len());

	Ok(vec![packet])
}

seems it works, but I need to investigate a bit more if all works as expected. Could you tell do you see any issues with this part of code ?

It's possible fill_buf will sometimes return before the entire message has been received. It only calls down to the reader if the internal buffer is empty, and the call to the inner reader may not return all of the remaining data at once either.

You say you need the "whole packet" but networking code usually doesn't concern itself with TCP packets. The application level messages sent over TCP are generally either fixed size, or contain a size somewhere early in the message so the receiver knows when they have the whole message.

Do you know what the message should contain?

yep, I know. And I can define expectable length for some first packets and even implement separate reader for LoginServer. But I want to avoid this.

If there's no fixed length, and no size included in the data, you're probably going to need to pull the data into a new buffer and start parsing it. If you find you don't have enough data you can try reading more data into your buffer.

Reading data this way is pretty error prone though which is why sizes are usually included if a message isn't fixed size.

1 Like

so, what is preferable way when use BufReader<...> ? could you explain common algorythm ? Since fill_buf sometimes return smth that not expected, how to get current buffer then ? need to use .buffer() instead ?

Figure out the smallest amount of data you can actually start parsing based on the message formats you're expecting and read that many bytes. Figure out which message type it is based on that data, and start parsing that message type. Any time you don't have enough data you can call into the reader again to get however many bytes you need to keep parsing.

1 Like

The thing to understand about TCP is that its a stream of individual bytes. TCP does not keep track of the boundaries between your packets for you. Your application must be able to look at the sequence of bytes and use some mechanism to determine where the boundaries are.

If the stream isn't closed after the message, then there's no way to say "read until the end of the packet", nor is there any way to say "read until the peer stops writing".

2 Likes

but I should call fill_buf() each time I need to request new data from server ?

read_exact is probably the right tool for the job.

Construct a mutable slice that's exactly as big as the next field you're expecting to parse, then call read_exact. Unlike other reader methods read_exact will block until it has enough data to fill the slice you pass it.

1 Like