Is there something similar to Framed Codecs for creating Stream adapters?

Hi everyone!

I've used Framed from Tokio to implement a Codec that converts from bytes into Frames. This works really well, is simple and clean to implement and is quite performant.

I was wondering if there is an equivalent to Framed but for creating Stream adapters. For example, rather than reading in bytes and returning frames, I'd like to read in bytes, process them and then return bytes. This would be like a "bump in the wire" so to speak.

I know I can get this by implementing AsyncRead and AsyncWrite, but there is a fair bit of complexity in implementing the poll family of functions that I'd rather avoid if I can.

I'd welcome any thoughts, comments or new directions to go in :slight_smile:

Thanks!

Not really. The best option I have for you is to use a codec to get a stream with a chunk of bytes as item type, then use StreamReader to get an AsyncRead.

Cool, at least I'm not missing anything obvious :slight_smile:

I read your previous discussion on implementing AsyncRead and AsyncWrite where you described it as "hard mode" and that was enough to make me think twice about implementing my own :slight_smile:

I did have a play with wrapping Tokio's TcpStream so that I could use poll_read but it seems that poll_read is private (although poll_peek is not).

The TcpStream type has a public poll_read method via the AsyncRead trait.

If you say more about what kind of transformation you want to do, I can say something about how easy it would be.

For example, let's say I have a TCP based protocol, and I use Framed with an appropriate codec to turn the stream of bytes from TcpStream into individual messages. Now, if I need to XOR each byte in the stream, currently the easiest way would be to add the XOR logic to the Codec. The downside with this is that it tightly couples the Codec with the need for XOR. If I later on need to decrypt something first, I would have to create a new Codec or at least add some discerning logic to the existing one. What I would prefer to do is keep the framing logic separate.

So ideally what that would look like (based on my current understanding) would be something like Framed but it provides two buffers - one with the incoming data, and one to write the outgoing data. This way I can put an arbitrary function anywhere into a stream to transform it, and keep that logic out of my codec.

That way a developer can just focus on the traits needed to make the struct compliant, and let the Framed equivalent handle the heavy lifting. It could be a fun project to get better acquainted with this area though :slight_smile:

The specific case of applying an XOR to the data would not be too difficult to implement manually via the IO traits. However, this relies on the fact that:

  1. XOR does not change the number of bytes.
  2. XOR does not have a minimum chunk size. (i.e. you can work on single bytes one at the time)

If you use any other type of encryption such that either of the above fail, then it becomes a lot more difficult.

Does StreamExt::scan help you at all? The closure you give it keep arbitrary state, output an arbitrary type, and isn't required to return a new value for each incoming item. The return from the scan call is stil a Stream, so you can easily chain multiple calls together to keep the steps of your processing independent.

That looks interesting, I hadn't spotted that. I'll have a play. Thanks! :relaxed:

Does that difficulty come from trying to maintain state by any chance? The reason I ask is that, in AsyncRead, I can't find a way to write to a buffer in Pin<&mut Self> which would act like intermediary state.

It seems like I can either pin the inner stream to call poll_read on it, or I can get a mutable reference to the buffer. As I need to read the data before I can write it to that buffer, I'm somewhat limited in what I can do.

So, have I missed something obvious or am I experiencing the difficulty you warned me about? :slight_smile:

Well, adding some extra state is how you would handle it. It's just somewhat annoying to do so.

How does your code look now? Perhaps you should try using Pin::into_inner and accessing the fields via the returned mutable reference?