Writing code to process a stream

Stream processing problem

I think the io interface and Iterator are 2 of rust's great strengths - to write similar code in a langauge like C is to invite disaster - a small mistake or lack of understanding leads to crashes or remote code execution. I have the following use case, and when I understand how to implement it I think I will understand streams completely.

Use case

My use case is processing the riff format. This format is used as a container format for things like wav, amongst others. The format is as follows (shamelessly ripped off from Microsoft WAVE soundfile format). (I am using a simplified version of the format that captures the key features I need to understand).

File format

The format is as follows:

  1. 4 bytes ascii - the ascii bytes for "RIFF" (in hex: 0x52 0x49 0x46 0x46)
  2. 4 bytes uint (little-endian) - The length of the stream/file excluding the first field and this field (this means the maximum size is ~ 3GB)
  3. 4 bytes ascii - the format of the riff file (e.g. "WAVE" in hex: 0x57 0x41 0x56 0x45)

Followed by 0 or more chunks with the following format

  1. 4 bytes ascii - the name of the chunk (e.g. "test" in hex: 0x74 0x65 0x73 0x74)
  2. 4 bytes uint (little-endian) - The length of the chunk content (excluding name or length)
  3. variable - The content of the chunk

Rust Library Interface

I want to create a rust library with the following characteristics:

  • Extract the structure where possible, but don't where not (pass through calls to Read)
  • Be as fast as possible (0 allocations)
  • Be easy to use, and easy to compose with other libraries, not making any assumptions about the stream beyond the format above
  • Handle errors gracefully

With that in mind, I see the core structs as follows

struct Riff<R: Read> {
    reader: R,
    name: [u8; 4],
    size: u32,
}

struct RiffChunk<R: Read> {
    reader: R,
    name: [u8; 4],
    size: u32,
}

with the following methods

impl<R: Read> Iterator for Riff<R> {
    type Item = RiffChunk<R>;

    fn next(&mut self) -> Option<RiffChunk<R>> { ... }
}

impl<R: Read> Read for RiffChunk<R> {
    fn read(&mut self, buf: &mut [u8]) -> Result<usize> { ... }
}

so the stream is converted to an iterator of streams. (I'm ignoring details of getting fixed size values, as this is easy (all lives on stack and is small, so just copy)).

This is the point at which I'm a bit stuck. There will have to be some lifetime restrictions: e.g. no reads from a RiffChunk after next() has been called on the parent Riff. The library will (I assume) want to move the Reader into the Riff struct, and then provide an into_inner method to recover the raw stream if necessary.

Questions

  • How do I implement the above interface? Do I need helper structs (my guess is yes)?
  • What should the Error type look like? Ideally layout errors should contain the underlying Reader with no data consumed, so a higher level library could choose to do error recovery if they wanted. With IO errors it's probably not possible to recover the stream.

I hope all this makes sense. I'm really excited about using Rust to safely process streaming data without any unnecessary allocations - and it feels like the only barrier is my understanding.

This seems relevant https://www.reddit.com/r/rust/comments/303a09/looking_for_more_information_on_streaming/

I'm getting closer....

It looks like Iterator is not a good fit for the blocks iterator, as it has methods that don't make sense here. For example, it is not possible to collect() items as each item is only borrowed until the next item is fetched. It looks like a new trait is needed, something like

trait StreamingIterator {
    type Item;

    fn next<'a>(&'a mut self) -> Option<&'a Item>;
}

I haven't figured out the right signature yet.

This kind of iterator is not possible in today's Rust, we need more type system features.

Ah ok, what features are they? Are they higher kinds? I've heard that name mentioned (I more or less understand it from my brief brush with category theory).

I think I've found a way to implement this, but it's a bit cumbersome. I have to move into the iterator item, and then manually move back out again when I've read the entire chunk.

My main problem with borrowing is that I have no way to express the lifetime of the iterator items. What I want is

  • once the iterator has started the reader is no longer accessible on the Riff object
  • once next() has been called, the previous iterator item is no longer valid

If I could implement this then I could create very easy to understand, easy to compose, high performance stream processing for e.g. digital audio.

Is it possible to do this using unsafe? Or container structs? Or is it just not possible at all?

Some form of higher-kinded polymorphism, yes. Higher kinded types would work, also possibly associated type constructors. Associated type constructors, part 1: basic concepts and introduction ยท baby steps has some related material, as does https://github.com/rust-lang/rfcs/pull/1598 . the latter a bit more directly.

Thanks for the links

Ha I've just read the RFC and it's exactly what I'm looking at.