Lazily read file in chunks split on delimiter?

I'm trying to parse a file that contains multiple XML values separated by a delimiter (of sorts):

# foo
<something>
   ...
</something>

# bar
<another-thing>
   ...
</another-thing>

# etc.

Currently I'm just iterating over the file line-by-line and buffering each chunk in memory:

let file = File::open("/path/to/file/above")?;
let buf_reader = BufReader::new(file);

let mut lines: Vec<String> = Vec::new();
let mut in_content = false;

for line in buf_reader.lines() {
  if let Some(content) = line {
    if content.starts_with("# ") {
      in_content = true;
    } else if content == "" {
      in_content = false;
      if lines.len() > 0 {
        let chunk = lines.join("\n").as_bytes();

        // parse XML chunk and do something with result

        lines = Vec::new();
      }
    } else if in_content {
      lines.push(content);
    }
  }
}

I'd rather not have to buffer each chunk in memory like this since it's a large file and I'm using quick_xml which is capable of streaming parsing. Is there a good way to split the file lazily and get a BufReader for each chunk?

You'll have to write something which sits in the middle of the reading process and intervenes before quick_xml sees the next #, such that quick_xml sees an end-of-file instead.

One way to do it would be to create a Read adapter (like BufReader is); it would watch the text it reads, with a state machine checking for \n# sequences, and decide when to start producing EOF instead of forwarding reads to the underlying Read. Once that happens, you can reset the state of the adapter and pass it to a new quick_xml session.

Another way would be to spawn a thread, which reads in a loop like you currently have; but instead of buffering a whole chunk, it writes it to a channel, and the receiver end of the channel is hooked up to quick_xml (probably using another impl Read for MyAdapter). This is not as efficient as the first option but it might be easier to write.

Note that in either case, you should not read whole lines, because lines in an XML document can be arbitrarily long. Use only individual bytes or some chosen buffer/chunk size, and remember that the thing you are doing is itself a parser, though a simple one.

4 Likes

Thanks @kpreid! Inserting a Read implementation in between makes a lot of sense. The state transitions are simple enough in this case that I can probably just do it "by hand" as the next byte is read (as opposed to a more formal parser w/ a grammar, etc.). How do I signal the "fake" EOF? Just return an Ok(0) from read? I'm not super familiar with these traits yet.

Yup:

If n is 0, then it can indicate one of two scenarios:

  1. This reader has reached its “end of file” and will likely no longer be able to produce bytes.

https://doc.rust-lang.org/stable/std/io/trait.Read.html#tymethod.read

1 Like

Thank you both. I was able to implement this intermediate layer (probably not very well mind you) and it seems to work okay when call I read_to_end multiple times:

let file = File::open("/path/to/file/above")?;
let mut chunker = Chunker::new(BufReader::new(file));

let mut buf1: Vec<u8> = Vec::new();
chunker.read_to_end(&mut buf1)?;
println!("Chunk 1: {0}", String::from_utf8_lossy(&buf1));

let mut buf2: Vec<u8> = Vec::new();
chunker.read_to_end(&mut buf2)?;
println!("Chunk 2: {0}", String::from_utf8_lossy(&buf2));

(Chunker is my intermediate reader - I'll paste the full source below)

This correctly prints the 1st block of XML followed by the 2nd block of XML. When I pass the chunker to quick_xml though it seems to read both chunks in 1 go. Seems like I'm not signaling EOF properly to quick_xml. Anyone have any ideas here? I know this is getting pretty niche now.

Source for Chunker:

#[derive(Debug, PartialEq, Clone)]
enum ChunkerState {
    Skip,
    Path,
    PathEOL,
    Chunk,
}

struct Chunker<R> {
    buf_reader: BufReader<R>,
    state: ChunkerState,
    eol: bool,
    eof: bool,
}

impl<R> Chunker<R> {
    fn new(buf_reader: BufReader<R>) -> Chunker<R> {
        return Chunker {
            buf_reader: buf_reader,
            state: ChunkerState::Skip,
            eol: false,
            eof: false,
        }
    }

    fn state_transition(&mut self, byte: u8) {
        match self.state {
            ChunkerState::Skip => {
                if byte == b'#' {
                    self.state = ChunkerState::Path;
                }
            },
            ChunkerState::Path => {
                if byte == b'\n' {
                    self.state = ChunkerState::PathEOL;
                }
            },
            ChunkerState::PathEOL => {
                if byte != b'\n' && self.eol {
                    self.state = ChunkerState::Chunk;
                }
            },
            ChunkerState::Chunk => {
                if byte == b'#' && self.eol {
                    self.state = ChunkerState::Path;
                }
            }
        }

        if byte == b'\n' {
            self.eol = true;
        }
    }
}

impl<R: Read> Read for Chunker<R> {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        let capacity = buf.len();
        let mut byte_buf = [0 as u8; 1];
        let mut buf_index = 0;

        if self.eof {
            // fake EOF since we reached the end of a chunk
            self.eof = false;
            return Ok(0);
        }

        loop {
            let size = self.buf_reader.read(&mut byte_buf)?;
            if size == 0 {
                // actual EOF
                return Ok(buf_index);
            }

            let byte = byte_buf[0];
            self.state_transition(byte);

            if self.state == ChunkerState::Chunk {
                // we're in the chunk so copy the byte
                buf[buf_index] = byte;
                buf_index += 1;
            } else if buf_index > 0 {
                // we have some previous chunk to emit
                self.eof = true;
                return Ok(buf_index)
            }

            if buf_index == capacity  {
                // buf is full for this call to `read`
                return Ok(buf_index);
            }
        }
    }
}
1 Like

If you regain control between quick_xml invocations, you could make your Chunker only reset its self.eof state by calling some explicit method.

1 Like

Hmm, my current implementation looks something like the example here: Reader in quick_xml::reader - Rust It reads XML tags in a loop and only breaks on EOF. Unclear to me why it's not seeing the EOF that the Chunker signals via Ok(0). I suppose I could detect the final closing tag of each XML chunk and return control that way.

No, don't do more parsing — that will be extremely hard to do correctly. Just make it return Some(0) every time it is called, not only the first, until the flag/state is externally reset by your controlling code.

1 Like

Oh yes, that's so simple and worked perfectly. Thank you! Not sure why that hadn't crossed my mind to clear the EOF flag from outside.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.