Stream parsing large multiline JSON gzip files using reqwest

Hi there!

I've been trying to write some code that:

  1. Uses reqwest to retrieve a stream from a url
  2. The url returns a gzip'd file. That file contains a big JSON file which contains multiline JSON. So each line is newline separated with a different JSON entry.
  3. The files are large (gigabytes even when gzipped), so I can't really load it all into memory at once
  4. I want to stream each line from the gzip from the url without loading the whole file into memory, and as I am streaming each line parse into JSON (this part I think I can handle fine on my own once I get each line reading).

Using a combination of different posts (above), I got the streaming part kind of working. There's definitely weirdness about it (I tried wrapping the GzipDecoder with a BufReader to get an interface I could call lines on), and it also breaks. I can usually read a handful of lines and then I get an Error: "" back because of the ok_or("")?.

I'm pretty new to Rust so I'm sure I'm doing something really wrong. But it's been hard to figure out how to approach this code since there are lots of conflicting examples available to pull from. Any help would be amazing!

use std::error::Error;
use async_compression::futures::bufread::GzipDecoder;
use futures::{
		io::{self, BufReader, ErrorKind},
		prelude::*,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
	let url = "<url>";
	let response = reqwest::get(url).await?;
	let reader = response
		.bytes_stream()
		.map_err(|e| io::Error::new(ErrorKind::Other, e))
		.into_async_read();

	let decoder = BufReader::new(GzipDecoder::new(BufReader::new(reader)));
	let mut lines_stream = decoder.lines().map(|l| l.unwrap());

	let mut line = lines_stream.next().await.ok_or("")?;
	while line.len() > 0 {
		println!("{:?}", line);
		line = lines_stream.next().await.ok_or("")?;
	}

	Ok(())
}

I think you want unwrap_or not ok_or. ok_or converts an Option into a Result by using the argument as the error value if the Option is None. But it looks like you're expecting to assign an empty string to line when the stream ends.

1 Like

Thanks @semicoleon - that explains why I was seeing an "Error". Making it unwrap_or makes it just finish normally.

Still trying to figure out why it's only making it through a handful of lines. Every file seems to stop at a different place, but most never process more than a few lines before stopping.

My original version of this code was written in Ruby and had a similar problem, and I had to use a gem for multiple file gzip reading: GitHub - exAspArk/multiple_files_gzip_reader: GzipReader for reading multiple files

It changed how the gzip was read to check for unused content and then move the io object to that position and then continue applying the gzip decoder. Maybe there's something similar I need to do in rust?

file = File.open("test/fixtures/test.json.gz")
MultipleFilesGzipReader.new(file).each_line { |line| puts line }

# internally
loop do
  gzip_reader = Zlib::GzipReader.new(io, options)

  gzip_reader.each_line(*args, &block)

  unused = gzip_reader.unused
  gzip_reader.finish

  io.pos -= unused ? unused.length : 0
  break if io.pos == io.size
end

Maybe it's a multi-stream GZIP file, and the particular decoder implementation in that crate is not prepared to handle it? Try flate2::MultiGzDecoder.

Thanks, i'll give that a try!

The fix ended up being pretty simple. After @H2CO3's comment I checked if async-compression had a multi gz decoder option, and it does in a slightly different interface.

let mut gz_decoder = GzipDecoder::new(BufReader::new(reader));
gz_decoder.multiple_members(true); // supports multiline parsing

So between the original code, and suggestions from @semicoleon and @H2CO3, here is the working version. Thanks everyone!

It still seems odd to me that I am wrapping a reader in a buf reader in a decoder in another buf reader - but maybe that is fine? Seems to be working ok.

use std::error::Error;
use async_compression::futures::bufread::GzipDecoder;
use futures::{
    io::{self, BufReader, ErrorKind},
    prelude::*,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let url = "<url>";
    let response = reqwest::get(url).await?;
    let reader = response
        .bytes_stream()
        .map_err(|e| io::Error::new(ErrorKind::Other, e))
        .into_async_read();

    let mut gz_decoder = GzipDecoder::new(BufReader::new(reader));
    gz_decoder.multiple_members(true); // supports multiline parsing
    let decoder = BufReader::new(gz_decoder);
    let mut lines_stream = decoder.lines().map(|l| l.unwrap());

    let mut line = lines_stream.next().await.unwrap_or(String::from(""));
    while line.len() > 0 {
        println!("{:?}", line);
        line = lines_stream.next().await.unwrap_or(String::from(""));
    }

    Ok(())
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.