How to efficiently parse and mutate BytesMut

I'm trying to read a file using Tokio, so the file arrives in pieces in BytesMut. The file contains messages, where each message ends with the sequence 0x7f 0x80, while a literal 0x7f in the file is escaped as 0x7f 0x7f. I need to remove escape sequences, detect (and stop at) the end-of-message sequence, and pass buffers-full of the un-escaped file to the consumer (which might stream the buffers over a network connection).

I've written this unescape() function, but it looks inelegant and inefficient. Is there a way to do this better?

struct FileReader {
    src: BytesMut,         // incoming data from the file
    saw_escape_byte: bool, // starts as false
    dst: BytesMut,         // outgoing data (e.g. to network)
}
impl FileReader {
    /// Transfer src to dst, removing escape sequences and stopping at
    /// end-of-message. Returns true if it found end-of-message.
    fn unescape(&mut self) -> bool {
        let mut cursor = Cursor::new(&self.src[..]);
        while cursor.has_remaining() && self.dst.remaining_mut() > 0 {
            let b = cursor.get_u8();
            if self.saw_escape_byte {
                // This is the byte after an escape byte.
                self.saw_escape_byte = false;
                if b == 0x80 {
                    // 0x7f 0x80 is an end-of-message marker.
                    return true;
                } else {
                    // Anything else is an escaped literal value.
                    self.dst.put_u8(b);
                }
            } else if b == 0x7F {
                // This is an escape byte. Don't pass it through, but do set the flag.
                self.saw_escape_byte = true;
            } else {
                // Pass a non-escaped value through.
                self.dst.put_u8(b);
            }
        }
        self.src.advance(cursor.position() as usize);
        false
    }
}

Here it is in the playground.

Thanks in advance!

Is the "file" composed of multiple messages or of just one message possibly followed by data to be discarded? I'd expect the former, but your playground code stops processing input data after the first message ends, and it doesn't look like unescape() advances src properly when 0x7F 0x80 is detected.

Personally, I'd suggest looking at the Decoder trait from tokio-util and implementing either that or a similar API; i.e.:

  • FileReader should only store saw_escape_byte
  • It should have a method that takes a BytesMut containing a piece of received data and extracts & returns the next message unescaped; each time you read a piece of data, you add the piece to src, call this method on src until it returns None, and then you read another piece, append to src, and repeat.
  • It should have some method to call on end-of-input so that unterminated messages and trailing escapes can be caught (assuming you consider those to be errors)

Yes, sorry, I should have been clearer. The file contains multiple messages, each of which ends with 7F 80. I stopped the playground code at the end of the first message just to keep main() simple but had the start of the second message there to make sure unescape() correctly returned on end-of-message.

Thanks for the pointer to the Decoder trait! I'll check it out!

Ok, the Decoder trait is very cool! But I'm not sure how to implement decode() any more cleanly than I did unescape().

One other thing I should clarify is that I'm trying not to require that an entire message can fit in RAM at once. That's actually one of the reasons they use end markers instead of length fields. (It also simplifies compression/decompression and encryption/decryption on the fly.)

I'm not too familiar with the bytes crate, but I'd write it in terms of slice::windows:

fn unescape(mut src: BytesMut) -> Option<BytesMut> {
    let mut res = BytesMut::with_capacity(src.len());
    let mut iter = src.windows(2).enumerate();
    while let Some((idx, window)) = iter.next() {
        let [a, b]: [u8; 2] = window.try_into().unwrap();
        match (a, b) {
            (0x7F, 0x80) => {
                src.advance(idx);
                return Some(res);
            },
            (0x7f, b) => {
                res.put_u8(b);
                iter.next(); // skip forward one byte
            },
            (b, _) => {
                res.put_u8(b);
            }
        };
    }
    None
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.