Help reducing allocations

I have some code that parses some JSON files and converts them into a set of parquet files. It all works great, but I'm hoping the memory usage could be reduced. I would really appreciate any advice on how to do so.

One area where I would really like to reduce allocations is in the deduplication and output section. In summary, I have a struct with fields of interest that I would like to assign integer IDs to unique values, let's call it Key, and write out a parquet file of each unique Key with its ID. I am using Polars to generate the files, and Polars really wants column-ordered data to turn into Series to be turned into a Dataframe; that is, I need a struct of column vectors rather than a vector of row structs.

The code I have uses the once_cell crate to set up a singleton cache for all "global" state. It has a hashmap to dedupe the Keys. The function that looks up the integer ID for a key also inserts it into a column-ordered writer struct that will eventually be used to save to a parquet file if the key has not yet been seen. I have written a simplified version of the code which I append below.

My issue is that every String field needs two copies, one as the key in the key_ids hashmap, and a copy for the KeyCols writer struct. I would very much like to halve my memory usage by doing something clever.

My first thought was just to use Arc<str> instead of String, but I don't think I can get from a Vec<Arc<str>> to a Polars Series. I also experimented with Cow<'a, str>, but couldn't figure out all the lifetime issues, and similarly with just storing references in KeyCols.

I'm not hoping someone to rewrite my code for me, but any advice on how to procede, or any notes on the code at all, would be greatly appreciated.

use std::{collections::HashMap, sync::Mutex};
use once_cell::sync::OnceCell;

// Struct with various fields of interest.
#[derive(Debug, Hash, PartialEq, Eq)]
struct Key {
    a: String,
    b: String,
}

// Column-ordered rows to be turned into a Polars dataframe for writing out to parquet.
#[derive(Debug)]
struct KeyCols {
    key_id: Vec<u64>,
    a: Vec<String>,
    b: Vec<String>,
}

impl KeyCols {
    fn new() -> Self {
        KeyCols {
            key_id: Vec::new(),
            a: Vec::new(),
            b: Vec::new(),
        }
    }

    // Add a row
    fn append(&mut self, key_id: u64, key: &Key) {
        self.key_id.push(key_id);
        self.a.push(key.a.clone());
        self.b.push(key.b.clone());
    }
}

#[derive(Debug)]
struct Caches {
    key_id: u64, // Next unused unique ID
    key_ids: HashMap<Key, u64>, // key to ID map
    key_cols: KeyCols, // columns to be turned into a polars dataframe
}

impl Caches {
    // Return a unique ID for each different Key.
    fn key_id_for_key(&mut self, key: Key) -> u64 {
        *self.key_ids.entry(key).or_insert_with_key(|k| {
            // Not previously seen. Assign a new ID.
            let id = self.key_id;
            // Also insert it into key_cols to be written out.
            self.key_cols.append(id, k);
            self.key_id += 1;
            id
        })
    }
}

// Global singleton with all state that we're accumulating.
static CACHE: OnceCell<Mutex<Caches>> = OnceCell::new();

fn setup_cache() {
    CACHE.set({
        let cache = Caches {
            key_id: 0,
            key_ids: HashMap::new(),
            key_cols: KeyCols::new(),
        };
        Mutex::new(cache)
    })
    .expect("Unable to initialize cache");
}

fn main() {
    setup_cache();
    let mut cache = CACHE.get().unwrap().lock().unwrap();
    let k1 = Key {
        a: "Hello".into(),
        b: "World".into(),
    };
    let id1 = cache.key_id_for_key(k1);

    let k2 = Key {
        a: "Foo".into(),
        b: "Bar".into(),
    };
    let id2 = cache.key_id_for_key(k2);

    dbg!(&cache, id1, id2);
}

The Polars doc shows creating a series from an iterator, using the collect method. Perhaps you could create an iterator from a Vec of Arc<str> and map the iterator elements to &str. I assume the string (the utf8 data) will be copied into the the series.

Edit: Changed Arc<String> to Arc<str> since the latter is usually preferred.

(Edit: This is incorrect, as pointed out below:) And you probably only need Rc, not Arc, if you only access the cache under the mutex.

Not quite. Mutex is Send + Sync only if inner value is Send (since otherwise it could be sent over threads by replacing), and Rc is explicitly not Send, so Mutex<Rc<_>> will be essentially unusable in multithreaded case.

Thanks for the correction.

I tried your suggestion and it worked. But I don't think it's actually saving much memory after all because when I call Series::new("colname", vec.iter().map(|arc| &**arc).collect::<Series>()) I think it must be cloning out the strs. It does save a few percent in persistent allocations.

I'm going to see if I can figure out the lifetime issues with storing just straight references.

Doesn't that (the fact that it didn't help much) mean that the real issue is duplication between the strings in the series and those in the cache? If so, storing references in the cache won't help. I assume the string is copied into the series because Polars is using its own memory format, and storing references in the cache won't change that.

Are you trying to reduce the maximum memory usage during the creation of the series? If so, the only thing you can do (assuming the strings are copied into the series) is to create the series' as incrementally as possible -- create one series, discard its data from the cache, create the next series, etc.

If you are trying to reduce the memory usage after all the series have been created, the only thing is to ensure the cache has been discarded.

I'm going to see if I can figure out the lifetime issues with storing just straight references.

I think this may be very difficult. The cache data structures are mutable, so the address (reference) of the strings will change as modifications take place.

You may be able to store a string in one data structure and some kind of id for that string in the other data structure. But I didn't see a way to do that with the data structures you're using -- I think you would need different data structures entirely. And I don't see how this would be much of an improvement over Arc<str>.

Another possibility is that the real problem is the duplication of Strings in the JSON data in memory and the Strings in the cache. Can the incoming JSON data be streamed, or does it all have to be in memory at once?

It would help to make a list of what is in memory at each phase.

Have you also considered creating the DataFrame directly from the JSON data, using unique to deduplicate, and then assign the integer IDs? This may not work as I don't know hardly anything about DataFrames, just thought I should ask in case you hadn't considered it.

You're right, I don't see a way to just use straight references. After sprinkling lifetimes around, I'm stuck where I hand the owned key to self.key_ids.entry(key) in key_id_for_key. The reference to the owned key passed to or_insert_with_key() is only valid within the closure body, but needs to be borrowed for the lifetime of the hashmap in the cache.

The data is being deserialized from JSON with json_serde, and using custom deserializers to retain only what is necessary. Sometimes the actual JSON structures must be retained because of forward reference issues, but in most cases they are immediately transformed into Key structs and deduped/assigned an ID, then tossed. So the bulk of the memory is in the hash, and any copies needed to create the Series for the DataFrame.

I'm not seeing any way to do much better than the Arc route.

You could take a stab at using a streaming parser like Struson (to pick a random lib.rs result) or a custom serde Deserializer to parse directly into your column vector, that will reduce your memory by about half since you only have your complete data in memory once.

Another trick is to only keep the key cache in memory, and directly write the column data to disk, presumably directly to parquet so you can natively lazy load and process it in polar. With both of these, you only have the keys in memory and a single row.

Your key structure could probably use a fancier data structure, perhaps a trie, to reduce usage, but that would be data dependant and out of my experience.

If you want still lower memory usage, you could also offload the key cache to disk, but since you need to query and update it, you need some sort of db: SQlite obviously, but Sled seems like a neat approach if you're feeling saucy.

I'm already streaming the JSON with custom deserializers, as I mentioned in my previous comment. Only relevant data is being kept in the Key cache.

There is really no good way to append to a parquet file a row at a time. I mean, you could, but it would result in a parquet file with a gajillion row groups, which is bad, slow, and inefficient.

One area I could remove some duplication is if I could get the KeyCol struct to directly store Series and push on values into those. Then I would just have one copy ever instead of having to aggregate all the rows into column vecs and then transform those to Series. But after searching the polars doc for a while, I do not see any way of doing such a thing. There doesn't appear to be any way of pushing a single value into a Series, it really does want to work with large batches of data.

Ah, I didn't see that, sorry!

I'm sad to hear you can't append parquet efficiently: perhaps you could batch, but I'm not sure what the trade-offs are like there.

It seems like you can append to a polars ChunkedArray by creating a new chunked array, so that might be an easier path forwards too.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.