Reading from streams, wrapping streams, issues with ownership

I'm writing a parser for a binary stream and my general approach is making an Iterator<Item=Element> where calling .next() parses bytes from the binary stream and returns the element. Similar to something like BufReader the Parser I'm writing takes ownership of the stream which is defined as implementing Read, so something like this

struct Parser<T: Read> {
  dataset: T,
}

impl<T: Read> Parser<T> {
  fn new(dataset: T) -> Parser<T> {
    Parser { dataset }
  }
}

impl<T: Read> Iterator for Parser<T> {
  type Item = Element;
  fn next(&mut self) -> Option<<Self as Iterator>::Item> {
    //...
    self.dataset.read_exact(...);
    //...
  }
}

The binary format I'm parsing supports a feature where a given Element can indicate that all following elements are deflated using RFC 1951. To support this I'd like to use the libflate crate to wrap self.dataset in its Decoder which also implements Read and continue reading bytes from that wrapped stream. I'd like to do something like this:

impl<T: Read> Iterator for Parser<T> {
  type Item = Element;
  fn next(&mut self) -> Option<<Self as Iterator>::Item> {
    if self.last_element.toggle_deflation() {
      self.dataset = Decoder::new(self.dataset);
    }
    //...
    self.dataset.read_exact(...);
  }
}

However this is not possible as it moves self.dataset into Decoder which wants ownership of the stream it's reading from. The next() function only takes &mut self so I won't have the accessibility to move any field like this. On top of this I've run into some type issues with the generics I've set up -- I think this is possible to work around by getting rid of the generic and change the field to be Box<dyn Read> but I still run into the problem of needing to move the stream I've been reading from into the decoder. How can I go about doing this?

Can you do something like this?

impl<T: Read> Iterator for Parser<T> {
  type Item = Element;
  fn next(&mut self) -> Option<<Self as Iterator>::Item> {
    if self.last_element.toggle_deflation() {
      let mut decoder = Decoder::new(self.dataset.by_ref());
      decoder.read_exact(...);
    }

    //...    
  }
}

The idea being that you create a Decoder in its own scope, and use Read::by_ref() to pass an "owned" &mut T to Decoder::new().

You'll need to be careful about buffering too. Normally things that do decompression will need to buffer a certain number of bytes (i.e. imagine it decompresses in chunks of 64 bytes). That means while you use read_exact() to read to the end of the field, the Decoder has pre-emptively read a couple bytes of uncompressed data from the reader.

Also, I don't think you want to do self.dataset = Decoder::new(self.dataset)... Even ignoring the fact that a Decoder<T> can't be assigned to a T, if you were to use something like Box::new(Decoder::new(self.dataset)) you'd end up wrapping the decoder in a new Box every time you see self.last_element.toggle_deflation(). That means reading from self.dataset would be like iterating to the end of a linked list, where you add a new level of indirection whenever you see the flag.

Hello and thank you!

Can you do something like this?

The issue I have is that my parsing code is spread across ~20 other functions, each with their own call to self.dataset.read_exact(). I wouldn't mind updating all the call sites to conditionally read from one dataset or the other depending on some state (I would abstract into its own struct probably) -- however the solution you suggest requires the specialized scoping to avoid ownership issues and would also require that I move all this code into the same function.

The idea being that you create a Decoder in its own scope, and use Read::by_ref() to pass an "owned" &mut T to Decoder::new().

If it's possible to use .by_ref() as an argument to the decoder then I think I can work through this issue. I was running into problems trying to pass in &self.dataset because Decoder::new() wants to take ownership and not a borrow. Would I run into the same issue using .by_ref()?

You'll need to be careful about buffering too. Normally things that do decompression will need to buffer a certain number of bytes (i.e. imagine it decompresses in chunks of 64 bytes).

Yep once I figure out how to structure this problem where I need to wrap the given dataset I was planning to also use a BufReader to wrap it so that chunks of the binary stream are pulled in at a time for decoding/reading.

...you'd end up wrapping the decoder in a new Box every time you see self.last_element.toggle_deflation().

Fortunately the toggle would only ever happen once. I simplified the code here to give an example but in the actual binary stream the deflation toggling can only happen once and can't be undone, it's more a marker indicating the rest of the stream is deflated.

This does not appear to be possible, as far as I can tell. The solution I went with was to unconditionally store the dataset as a Decoder and when deflation is not needed to use dataset.as_inner_mut() to get access to the underlying dataset.

    pub struct Dataset<DatasetType: Read> {
        deflated: Decoder<BufReader<DatasetType>>,
        read_deflated: bool,
    }

    impl<DatasetType: Read> Dataset<DatasetType> {
        pub fn new(dataset: DatasetType, buffsize: usize) -> Dataset<DatasetType> {
            Dataset {
                deflated: Decoder::new(BufReader::with_capacity(buffsize, dataset)),
                read_deflated: false,
            }
        }

        pub fn set_read_deflated(&mut self, read_deflated: bool) {
            self.read_deflated = read_deflated;
        }
    }

    impl<DatasetType: Read> Read for Dataset<DatasetType> {
        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
            if self.read_deflated {
                self.deflated.read(buf)
            } else {
                self.deflated.as_inner_mut().read(buf)
            }
        }
    }

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.