Packet reassembly

I have a Rust program that reads data from a device over a serial port. That data was nice delimited into "packets" with start sequence, length, checksum etc. Each packet contained a "frame" of data in various binary blobs that I decode. Nice and simple, life was good.

I say "was" because the device vendor has "upgraded" the protocol such that those low level packets now cary much larger "frames" that can be split over multiple packets. So I now have a need to add a fame reassembler that glues incoming packets together and when there is enough for a data buffered up for a complete frame removes the frame worth of bytes from the buffer for actual parsing of the data within.

Basically I need to add byes from incoming packets to a buffer and remove frame sized numbers of bytes from the other end of the buffer when the buffer is full enough to be holding a frame.

That is easy enough I thought. And came up with this:

#[derive(Debug, PartialEq)]
pub struct PacketAssemblyBuffer {
    buffer: Vec<u8>,
}

impl PacketAssemblyBuffer {
    // Create a new PacketAssemblyBuffer
    pub fn new() -> PacketAssemblyBuffer {
        PacketAssemblyBuffer {
            buffer: Vec::<u8>::new(),
        }
    }

    // Put new bytes onto back of buffer.
    pub fn put(&mut self, data: &mut Vec<u8>) {
        self.buffer.append(data);
    }

    // Obtain an immutable reference to the underlying buffer
    pub fn get_ref(&mut self) -> &[u8] {
        &self.buffer
    }

    // Remove and return bytes from the front of buffer.
    pub fn consume(&mut self, at: usize) -> Option<Vec<u8>> {
        if at <= self.buffer.len() {
            let front = self.buffer.split_off(at);
            let back = self.buffer.clone();
            self.buffer = front;
            Some(back)
        } else {
            None
        }
    }
}

That's simple and seems to work well enough. But all those allocations going on bug me.

So I though about just using a single big Vector of bytes. The data would be accumulate in that buffer until it hit the Vector size. Data would be "removed" from the front (low end) of the array. With some indices to keep track of the "head" and "tail" of the data. On hitting the end of the vector the "live" data would just be copied down to the start of the vector and "head"/"tail" indices adjusted.

This all starts to sound like a fiddly job that someone must have done already as a crate.

Any suggestions?

The standard library's VecDeque serves this purpose. The design is slightly different: it wraps around from the end to the start instead of copying (so the data is sometimes noncontiguous, but you can ask it to rearrange explicitly when needed).

4 Likes

The BytesMut type in bytes does exactly this. You can push a slice using BytesMut::extend_from_slice and you can remove from the front with BytesMut::advance. It will keep track of how far into the slice you have advanced, and when it reallocates it will either move stuff to the front of the same allocation, or move only the active part to the start of a new allocation depending on various details.

3 Likes

What, you mean I can reduce the whole thing down to:

use bytes::Buf;
use bytes::BytesMut; Provides 'extend_from_slice'

#[derive(Debug, PartialEq)]
pub struct PacketAssemblyBuffer {
    buffer: BytesMut,
}

impl PacketAssemblyBuffer {
    // Create a new PacketAssemblyBuffer
    pub fn new() -> PacketAssemblyBuffer {
        PacketAssemblyBuffer {
            buffer: BytesMut::with_capacity(1024),
        }
    }

    // Put new bytes onto end of buffer.
    pub fn put(&mut self, data: &[u8]) {
        self.buffer.extend_from_slice(data);
    }

    // Obtain an immutable reference to the underlying buffer
    pub fn get_ref(&mut self) -> &[u8] {
        &self.buffer
    }

    // Remove bytes from the front of buffer.
    pub fn remove(&mut self, cnt: usize) -> Option<()> {
        if cnt <= self.buffer.len() {
            self.buffer.advance(cnt);
            Some(())
        } else {
            None
        }
    }
}

Awesome.

Oddly enough I had a browse through the 'BytesMut' documentation and was too dumb to realise it would shuffle the buffer content around like that. Actually rereading the docs now it's still not entirely obvious.

What I'm wondering now is how big that buffer will get. How does it know when to shuffle bytes around rather than make a new allocation?

Anyway, seems to work. Where would I be without you Alice?

It's very possible that the docs are bad.

The shuffling/reallocate happens in extend_from_slice when you reach the end of the buffer. It will only ever reallocate if the extra space obtained by shuffling data around is too small to fit the slice you are pushing.

(It will also reallocate if you have used one of the split methods to create multiple Bytes or BytesMut objects to the same allocation, but you aren't currently doing that.)

1 Like

Sounds reasonable.

Excellent, thanks.

1 Like

It is called "binary parsing", I think Rust already has some sort of parser combinators for doing this at least at a byte granularity level.

Also, I used the ragel tool for writing my custom multi-protocol parsers for microcontrollers. It allows me to mix two custom vendor protocols with the classical MODBUS RTU implementation. If your package formats are not complex, it can maybe be simpler to bind generated parser in C code with your Rust.

Then, I'd like to have a native Rust tool/library for replacing something like https://kaitai.io/
or the coolest is the "symmetric binary parser generator" which can do both parsing and data packaging.

Maybe someone can advise some tutorials on writing bit-level binary stream parsers, which let me parse groups of single bits in a declarative manner?

There is a large number of tutorials about classical text parsing starting from the Dragon book, but the binary stream & symmetrical parser generators topic is not so covered.

That is exactly what I have been doing. Pulling out all kind of bits and bytes from messages and turning them into useful objects.

It does not help that when I first did this over a year ago the vendor did not have any documentation, only some example code in C# that did the job. I just implemented that in Rust line for line manually. Resulting in code that looked like this for example:

object.mode = buffer[0] & 0r11;
if object.mode == 0 {
    let mut r1 = f32::from((buffer[1]) & 0r13F) * 128.0;
    r1 += f32::from(buffer[0]) / 2.0; 
    r1 -= RANGE_OFFSET; 
    r1 *= RANGE_RESOLUTION; 
    r1 = f32::two_decimals(r1);

    let mut r2 = f32::from(buffer[3] & 0r17) * 1024.0;
    r2 += f32::from(buffer[2]) * 4.0; 
    r2 += f32::from(buffer[1] & 0r1c0) / 64.0;
    r2 -= RANGE_OFFSET; 
    r2 *= RANGE_RESOLUTION;
    r2 = f32::two_decimals(r2);

    let mut t1 = f32::from(buffer[4] & 0r13F) * 32.0;
    t1 += f32::from(buffer[3]) / 8.0; 
    t1 -= TEMP_OFFSET; 
    t1 *= TEMP_RESOLUTION;
    t1 = f32::two_decimals(t1);

    let mut t2 = f32::from(buffer[6] & 0r11) * 1024.0;
    t2 += f32::from(buffer[5]) * 4.0; 
    t2 += f32::from(buffer[4]) / 64.0;
    t2 -= TEMP_OFFSET; 
    t2 *= TEMP_RESOLUTION; 
    t2 = f32::two_decimals(t2);

    let siz = f32::from(buffer[6] >> 1) * SIZE_RESOLUTION;
    let id = buffer[7];
    ...

That is after detecting message packets in the incoming byte stream, checking their length, header and payload checksums etc.

Now the vendor has a newer protocol. At least fields are now multiple of byte in size and aligned on byte boundaries.

But now they slipped in a new feature that the message objects are much bigger and can be split over multiple packets. Undocumented of course but I think I have figured out what they are doing. Hence my need to shoe horn a packet reassembler in there.

Proprietary protocols are such fun. Luckily I don't need to also generate this junk.

Likely I'd get the job done quicker by hand crafting the parser than trying to figure out how to use whatever combinators. I hardly now what a combinator is.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.