Understanding Framed in tokio

I'm following the tip given here and started looking at using tokio-util's Framed.

Basically I want to mix reading/writing lines and binary data, and I'm trying to wrap my head around the components in tokio-util.

First the basics: Does Framed (and FramedRead and FramedWrite) provide buffering, and the Decoder reads from (and drains) the Framed{Read} buffer and returns the formatted data to the application?

Let's say the application can receive two types of headers; one doesn't have trailing binary data and the other one can have trailing binary data [if a Len parameter has been included].

If I understand it correctly, the idea is (and I realize there are many ways to solve this, but I'm trying to understand the intended pattern of tokio-util::codec, if there is such a thing) that one writes a single Decoder that handles both the line-based part of the protocol as well as the binary; i.e. it is up to the decoder to keep state between calls.

The example code here calls next to produce the next inbound line, but where is the actual implementation for it? If I understand it right it ends up calling the decode() function here.

So here's the big question; should I write a decode() function that returns an enum InboundType (wrapped in the appropriate Result/Option) kind of like this:

enum HeaderType {
  Hello(HelloParams),
  Data(DataParams)
}

enum InboundType {
  Header(HeaderType),
  Binary(u32)   // Remaining data to read
}

So the async connection function would do this:

loop {
  let mut evt = Framed::new(socket, MyCodec::new());
  match(evt) {
    Header(params) => { /* check what type of header) */ }
    Binary(remain) => { /* read "raw" data from Framed and save to file" */ }
  }
  // ...
}

Have I parsed the documentation/code somewhat right?

I will focus on FramedRead as that appears to be the direction that is relevant here. I will try to explain the basic idea behind the use of this tool, which should answer several of your questions.

The basic loop inside FramedRead is the following:

  1. Read data from underlying AsyncRead and append it to an internal buffer.
  2. Pass the internal buffer to the Decoder. If it returns an item, the Stream impl on FramedRead returns that item.

This is all it does. The Decoder should modify the internal buffer to remove data that is no longer needed — the FramedRead wont modify it in any way besides appending to it.

So the idea is this: You write a Decoder. The job of the decoder is to look into a buffer of data, and determine if the buffer contains enough data to produce a full item in the stream.

If the buffer does not contain enough data to produce an item, it returns Ok(None) and leaves the buffer unmodified. In this case, the containing FramedRead will read more data from the underlying AsyncRead, append the bytes to the buffer, and call decode again later, hopefully with enough data to produce an item.

If the buffer contains enough data for the next item, the decoder should remove those bytes from the buffer and return the item as Ok(Some(item)). The returned item becomes the next item in the stream. The decoder should only remove one item for each call to decode, as the FramedRead will repeatedly ask the decoder for the next item until the decoder says there is not enough data.

When the decoder returns Ok(None), and you know how long the next item is going to be, it's a good idea to call reserve on the buffer such that the FramedRead will ask for that much data on the next call to the IO resource. That said, FramedRead calls the decoder every time there is more data available — it wont wait until the buffer is complete filled.

If the decoder returns an error, the stream is terminated.


Now for your questions.

  1. Does FramedRead provide buffering? Yes.
  2. Should the item type be an enum? In your case, yes.
  3. Should the Decoder keep track of state? Yes, if it is needed. Often it is enough to just leave the half-completed item in the buffer; it will still be there on the next call.
  4. Where is next defined? On the StreamExt trait. It uses the Stream impl on FramedRead to extract items.
2 Likes

I was also looking at FramedRead recently. This explanation would have saved me a lot of trouble. Thanks for the clear description

1 Like

I will put it on my list to improve the documentation on the crate itself.

Hmm... Should this be interpreted as "don't remove anything off the input buffer until you're ready to return something to the application"?

As I mentioned previously I'm looking to read entire all lines until an empty line is encountered. While I could wait until a double-linefeed has been received, I would prefer (for various reasons) to extract them line-by-line internally until an empty line has been received.

Modifying the buffer without returning an item is also fine, but it is rather uncommon to do it like that as it requires extra bookkeeping, which you might not need otherwise. The FramedRead doesn't care. It just appends.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.