Packets duplication when `read_exact` used

Hi folks !

I have an issue with my packet reader implementation. The part of the read method, where I use read_exact reads the packets twice:

use std::io::{Cursor, Error};
use std::sync::{Arc, Mutex as SyncMutex};
use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};

// ...

pub struct Reader {
    _stream: BufReader<OwnedReadHalf>,
    _decryptor: Option<Decryptor>,
    _warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>,
    _need_sync: bool,
}

impl Reader {
    pub fn new(reader: OwnedReadHalf) -> Self {
        Self {
            _stream: BufReader::new(reader),
            _decryptor: None,
            _warden_crypt: Arc::new(SyncMutex::new(None)),
            _need_sync: false,
        }
    }

    pub fn init(&mut self, session_key: &[u8], warden_crypt: Arc<SyncMutex<Option<WardenCrypt>>>) {
        self._decryptor = Some(Decryptor::new(session_key));
        self._warden_crypt = warden_crypt;
        self._need_sync = true;
    }

    pub async fn read(&mut self) -> Result<IncomingPacket, Error> {
        let (opcode, body) = if let Some(decryptor) = self._decryptor.as_mut() {
            if self._need_sync {
                self._need_sync = false;

                let mut buffer = [0u8; 65536];
                let bytes_count = self._stream.read(&mut buffer).await?;

                let mut header_reader = Cursor::new(&buffer[2..4]);
                let opcode = ReadBytesExt::read_u16::<LittleEndian>(&mut header_reader)?;

                (opcode, buffer[4..bytes_count].to_vec())
            } else {
                // this block contains read_exact usages, so here is an issue
                let mut buffer = [0u8; 1];
                self._stream.read_exact(&mut buffer).await?;

                let mut first_byte = buffer[0];
                first_byte = decryptor.decrypt(&[first_byte])[0];

                let is_long_packet = first_byte >= 0x80;

                let buffer_size = if is_long_packet { 4 } else { 3 };
                let mut buffer = vec![0u8; buffer_size];
                self._stream.read_exact(&mut buffer).await?;

                let mut header = decryptor.decrypt(&buffer);
                if is_long_packet {
                    first_byte = first_byte & 0x7Fu8;
                }
                header.insert(0, first_byte);

                let mut header_reader = Cursor::new(&header);
                let size = if is_long_packet {
                    ReadBytesExt::read_u24::<BigEndian>(&mut header_reader)? as usize
                } else {
                    ReadBytesExt::read_u16::<BigEndian>(&mut header_reader)? as usize
                };

                let opcode = ReadBytesExt::read_u16::<LittleEndian>(&mut header_reader).unwrap();

                let mut body = vec![0u8; size - INCOME_WORLD_OPCODE_LENGTH];
                self._stream.read_exact(&mut body).await?;

                if opcode == Opcode::SMSG_WARDEN_DATA {
                    body = self._warden_crypt.lock().unwrap().as_mut().unwrap().decrypt(&body);
                }

                (opcode, body)
            }
        } else {
            let mut buffer = [0u8; 65536];
            let bytes_count = self._stream.read(&mut buffer).await?;
            let body = buffer[1..bytes_count].to_vec();
            let opcode = buffer[0] as u16;

            (opcode, body)
        };

        Ok(IncomingPacket { opcode, body })
    }
}

could somebody tell what could be wrong and how to fix this ?

I put println! to the begin and end of the part with read_exact:

} else {
    println!("BUFFER BEFORE: {:?}", limit(self._stream.buffer()));
    let mut buffer = [0u8; 1];
    self._stream.read_exact(&mut buffer).await?;

    // ...

    println!("BUFFER NOW: {:?}", limit(self._stream.buffer()));

    (opcode, body)
}

and got the output:

BUFFER BEFORE: []
BUFFER NOW: [220, 166, 92, 23, 2, 1, 1, 195, 91, 80]
BUFFER BEFORE: [220, 166, 92, 23, 2, 1, 1, 195, 91, 80]
BUFFER NOW: [207, 54, 113, 90, 1, 0, 0, 0, 110, 84]
BUFFER BEFORE: [207, 54, 113, 90, 1, 0, 0, 0, 110, 84]
BUFFER NOW: [110, 84, 70, 97, 2, 0, 0, 0, 0, 2]
BUFFER BEFORE: [110, 84, 70, 97, 2, 0, 0, 0, 0, 2]
// ...

so it seems on next read_exact calling buffer contains previous data. Does it exist the way to clear buffer immediately after read_exact was called ?

Call consume.

Tells this buffer that amt bytes have been consumed from the buffer, so they should no longer be returned in calls to read
BufRead in std::io - Rust.

After I added consume at the end of this part (after last read_exact call):

    let mut body = vec![0u8; size - INCOME_WORLD_OPCODE_LENGTH];
    self._stream.read_exact(&mut body).await?;

    self._stream.consume(size);

    // ...

    println!("BUFFER NOW: {:?}", limit(self._stream.buffer()));

    (opcode, body)
}

at the beginning buffer is empty, so I cannot read anything. Probably I should do smth else ?

P.S. this is the code where I use my reader.

Put consumer after the last buffer call, where you don't need the contents anymore.

it was put after the last _stream.read_exact. Btw shouldn't consume be used only with fill_buf ?

They're asking you to put it after the last call to self._stream.buffer(), since apparently this also uses the content of the stream.

Looking at your output, this is behaving as expected. BUFFER BEFORE shows that the buffer has been filled by a call to a read function, and BUFFER NOW shows that the buffered data has been consumed by the call to read_exact.

Taking it as pairs:

BUFFER BEFORE: []
BUFFER NOW: [220, 166, 92, 23, 2, 1, 1, 195, 91, 80]

You started with an empty buffer. You did your thing, now the buffer starts with 220 for the next iteration.

BUFFER BEFORE: [220, 166, 92, 23, 2, 1, 1, 195, 91, 80]
BUFFER NOW: [207, 54, 113, 90, 1, 0, 0, 0, 110, 84]

As expected, since you've not called any read function, the buffer at the beginning of this iteration contains the same data as the buffer at the end of the last iteration. You do a read, and the buffer contents change.

BUFFER BEFORE: [207, 54, 113, 90, 1, 0, 0, 0, 110, 84]
BUFFER NOW: [110, 84, 70, 97, 2, 0, 0, 0, 0, 2]

The pattern repeats itself; each iteration starts with the unread data from the last iteration already in the buffer, and ends with a new buffer contents containing something that's not yet been read.

You should look for a different explanation.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.