Streaming CSV allocates too much memory

I have a problem with my containers OOMing in production.

What seems to be the problem is that streaming data via reqwest and processsing through csv_async allocates memory for the full file being read:

		let resp = client
			.get(&url)
			.send_with_digest_auth(&self.i.config.credentials.user, &self.i.config.credentials.password)
			.await
			.with_context(|| format!("get feed: '{url}'"))?
			.error_for_status()
			.with_context(|| format!("get feed status: '{url}'"))?;
		let r = StreamReader::new(resp.bytes_stream().map_err(std::io::Error::other));
		let br = BufReader::new(r);
		let mut dec = GzipDecoder::new(br);
		dec.multiple_members(true);
		let mut reader = csv_async::AsyncReaderBuilder::new()
			.delimiter(b',')
			.double_quote(true)
			.has_headers(true)
			.create_reader(dec);
		while let Some(record) = records.next().await {
		    // ... processing data
		}

Even if I don’t do any processing, the full file size of the file being read is allocated in memory, which can be tens of GiBs.

I need to change this so only the “current window” of the stream is allocated (which is what I had expected to begin with).

Do you know why this happens and how to fix it?

Imports:

use async_compression::tokio::bufread::GzipDecoder;
use csv_async::StringRecord;
use diqwest::WithDigestAuth;
use futures_util::TryStreamExt;
use tokio::io::BufReader;
use tokio_util::io::StreamReader;

Is there a way to find out which layer of the streaming causes the memory to grow?

I would look at csv-async itself first.

By the way, StreamReader implements AsyncBufRead itself, so you don't need BufReader.

Thanks, for some reason the removal of BufReader seems to have improved but not removed the problem. I'll try to better isolate the issue and run an analysis of heap allocations.

Did you rule out that the following code leaks the memory? What happens if you drop the records as you receive them?

Normally in node or whatever I would suspect back pressure wasn't being correctly sent back up the chain, but my understanding is that async iteration automatically handles that ...

This might as well be the case - need to test it out further

Tried that, unfortunately it does not help.

It's just a hunch, but did you try without the gzip decompression?

Unfortunately this is not an option, as we load data from an external API.

I have tried however to isolate the issue, and tried reading from a file with and without gzip compression. Difference is 10 MiB memory allocation, so it has an impact but most likely is not the cause of the problem.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.