Hi community ! I have Reader that use tokio OwnedReadHalf and I want to wrap it into BufReader, but seems like I do smth wrong, because app hangs on self.stream.read_to_end(&mut packet).await?; (see code below):
pub struct Reader {
stream: BufReader<OwnedReadHalf>,
decryptor: Option<Decryptor>,
warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>
}
impl Reader {
pub fn new(reader: OwnedReadHalf) -> Self {
Self {
stream: BufReader::new(reader),
decryptor: None,
warden_crypt: Arc::new(SyncMutex::new(None))
}
}
pub fn init(&mut self, session_key: &[u8], warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>) {
self.decryptor = Some(Decryptor::new(session_key));
self.warden_crypt = warden_crypt;
}
pub async fn read(&mut self) -> Result<Vec<Vec<u8>>, Error> {
match self.decryptor.as_mut() {
Some(decryptor) => {
let mut packets = Vec::new();
while !self.stream.buffer().is_empty() {
let mut header = vec![0u8; INCOMING_HEADER_LENGTH as usize];
self.stream.read_exact(&mut header).await?;
let mut reader = Cursor::new(decryptor.decrypt(&header));
let size = ReadBytesExt::read_u16::<BigEndian>(&mut reader)?;
let opcode = ReadBytesExt::read_u16::<LittleEndian>(&mut reader)?;
let mut body = vec![0u8; (size - INCOMING_OPCODE_LENGTH) as usize];
self.stream.read_exact(&mut body).await?;
if opcode == Opcode::SMSG_WARDEN_DATA {
let warden_crypt = &mut *self.warden_crypt.lock().unwrap();
body = warden_crypt.as_mut().unwrap().decrypt(&body);
}
let mut packet: Vec<u8> = Vec::new();
packet.append(&mut size.to_be_bytes().to_vec());
packet.append(&mut opcode.to_le_bytes().to_vec());
packet.append(&mut body);
packets.push(packet);
}
Ok(packets)
},
_ => {
let mut packet = Vec::new();
// app hangs here
self.stream.read_to_end(&mut packet).await?;
Ok(vec![packet])
},
}
}
}
I suppose I missed smth so Reader cannot read anything from internal buffer. Could somebody help me to got where is the issue ?
I see. Well, probably read_to_end is not what should be used there. I just need to read whole packet that come from server. Probably would be better if I share the code that I used before:
pub fn read(&mut self) -> Result<Vec<Vec<u8>>, Error> {
let mut buffer = [0u8; 65536];
match self.stream.try_read(&mut buffer) {
Ok(bytes_count) => {
let result = match self.decryptor.as_mut() {
Some(decryptor) => {
let mut warden_crypt = self.warden_crypt.lock().unwrap();
Self::parse_packets(
buffer[..bytes_count].to_vec(),
decryptor,
warden_crypt.as_mut().unwrap(),
)
},
_ => {
vec![buffer[..bytes_count].to_vec()]
},
};
Ok(result)
},
Err(err) => {
// ...
},
}
}
so to get whole packet I read bytes_count bytes from buffer. I want to do smth like that with BufReader.
Also, I prefer not to use try_read, instead want to use async read.
client connect to LoginServer first, packets from this server not contain size in its header, after client pass SRP authentication it reconnect to WorldServer and packets from this server contains size in header. So before I just relied on bytes_count value (but I expect that packet's length contains full width).
Also (probably this would be useful), WorldServer can send multiple packets split into one. So I read size, get size bytes from the packet, then read size again until buffer will be empty.
It's possible fill_buf will sometimes return before the entire message has been received. It only calls down to the reader if the internal buffer is empty, and the call to the inner reader may not return all of the remaining data at once either.
You say you need the "whole packet" but networking code usually doesn't concern itself with TCP packets. The application level messages sent over TCP are generally either fixed size, or contain a size somewhere early in the message so the receiver knows when they have the whole message.
If there's no fixed length, and no size included in the data, you're probably going to need to pull the data into a new buffer and start parsing it. If you find you don't have enough data you can try reading more data into your buffer.
Reading data this way is pretty error prone though which is why sizes are usually included if a message isn't fixed size.
so, what is preferable way when use BufReader<...> ? could you explain common algorythm ? Since fill_buf sometimes return smth that not expected, how to get current buffer then ? need to use .buffer() instead ?
Figure out the smallest amount of data you can actually start parsing based on the message formats you're expecting and read that many bytes. Figure out which message type it is based on that data, and start parsing that message type. Any time you don't have enough data you can call into the reader again to get however many bytes you need to keep parsing.
The thing to understand about TCP is that its a stream of individual bytes. TCP does not keep track of the boundaries between your packets for you. Your application must be able to look at the sequence of bytes and use some mechanism to determine where the boundaries are.
If the stream isn't closed after the message, then there's no way to say "read until the end of the packet", nor is there any way to say "read until the peer stops writing".
read_exact is probably the right tool for the job.
Construct a mutable slice that's exactly as big as the next field you're expecting to parse, then call read_exact. Unlike other reader methods read_exact will block until it has enough data to fill the slice you pass it.