Best way to asynchronously buffer and write items from a stream

I have some code that

Asynchronously reads data from S3 (into a lines Stream)

Applies a bunch of transformations to this line stream, and ends with a stream of some Item structs.

Primarily these structs contain Avro values, which I serialize to bytes and write to S3. (The Avro values belong to different entities so I need to group and write them to different paths).

Currently I am doing just joining on the stream and getting results into memory and then uploading them in parallel to S3.

This is not working well if the input data is large since we can't hold data in memory. I want to update the code so the memory usage stays low and the output data is buffered per entity until e.g 5MB of bytes and then uploaded to S3.

What's the general way to go about this, I'm seeing a lot of things, asyncwrite, mpsc channels, etc.

Would appreciate some guidance/pointers. Thanks

Are you using some S3 SDK to download and upload data, or are you writing your own with async Rust?

If you're rolling your own, you could for example download the line data using the reqwest crate as a byte stream, then chunk it into lines using a Tokio StreamReader, perform your transformations and immediately upload them using reqwest again.

You could also buffer the transformed Item structs in a HashMap and upload in batches of 5MB, as you suggested.

You also have a large variety of stream extension methods available in futures::stream, or can directly code with async-stream. If you want to limit input data consumed at any point, one straightforward way is to use a tokio Semaphore when reading from the stream, and passing the permit along until you finish upload.

Some pseudocode:

let memory_available = tokio::sync::Semaphore::new(5 * 1024 * 1024);
let bytes = reqwest::get(url).await?.bytes_stream();
async_stream::stream! {
  for await chunk in bytes {
    let memory_used = memory_available.acquire_many(chunk.len()).await;
    spawn(async {
      process(chunk).await;
      drop(memory_used);
    });
  }
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.