How to reduce memory consumption?

Hi,

I tried to build a script that loads a csv and identifies and maps connected components. So i.e.:
"id" | "phone number"
1 | 123
2 | 123
2 | 234
3 | 234
4 | 555
5 | 555
5 | 666
6 | 777
would return:

"id" | "shared_ids"
1 | [1, 2, 3]
2 | [1, 2, 3]
3 | [1, 2, 3]
4 | [4, 5]
5 | [4, 5]

So I want to show the full chain for everyone in the chain.

With quite some help of ChatGPT (in some moments I was pretty stuck) I created this script: GitHub

The idea here was that I load the CSV into vectors where the values are stored and the rest of the script I use HashMaps and HashSets for speed, but also try to use references as much as possible to keep memory consumption as low as possible while having good speed.

The speed looks good (ran faster than my Python equivalent (see on Medium, if interested). However the memory consumption looks wasteful. The source file is 120 MB big, but the memory consumption is at 3 GB (in Python I had 2.1 GB roughly).

I am wondering where I waste the memory so much. I think there is too much cloning, but I did not see how to prevent that.

I would be super happy to have some feedback on this :slight_smile:

Not the answer, just a couple of not-really-necessary quick notes.

Quite a weird choice. Siphash seems a bit slow, although I didn’t really consider the use case. You might want to take a look at the rustc-hash crate.

Seems like a pretty inappropriate word.

Line 23 looks like a waste of memory. Lines 40-42 look a bit weird, try zipping iterators instead. You seem to use a lot of collections, and some uses might be unnecessary. Did not fully analyze the program, unfortunately. Hopefully I’m not just wasting your time.

2 Likes

First of all thanks for taking the time to take a look. Much appreciated.

Quite a weird choice. Siphash seems a bit slow, although I didn’t really consider the use case. You might want to take a look at the rustc-hash crate.

I tried multiple hashing algorithms. The reason why HashMap made the race so far has been the with_capacity functionality. I will try rustc-hash though.

Seems like a pretty inappropriate word.

Happy to learn what would describe it better.

let record: Vec<&str> = record.split(',').collect();

This is for sure a hint I did not even consider. Could you elaborate why this might waste memory? What would be a better way to do it?

Lines 40-42 look a bit weird, try zipping iterators instead. You seem to use a lot of collections, and some uses might be unnecessary.

I will check that out.

Thanks a lot for the input.

How long are the entries/identifiers? Each String and Vec<&str> takes 24 bytes and each &str to it takes another 16 on 64 bit platforms. If your csv entries have an average length of x ASCII characters you already get a factor of (24 + 24 + (n+1)*16) / x overehad where n is the number of times the string appears as a mapping target. For n = 2 and x = 10 this is roughly a factor of 10.

Additionally, hash tables need some extra memory to minimize hash collisions. I forgot how much this was tough...

EDIT: You can use interned strings like provided by this crate string-interner or build a simple one yourself. You can use u32 as a symbol type as long as the number of strings is smaller than 2^32.

1 Like

The strings are somewhere between 5 and 20 characters. The cgains can get long though (several hundreds).

I thought about the following alternative:
What if I map a unique integer to each unique entity and identifier at the beginning of the and refer to these the whole script long.

This would create some overhead as I need the mapping and also the reverse-translation at the end, but it would save a big chunk of memory I guess?

I tried rustc-hash, but it is significantly slower for this use case. It also was quite unstable in runtime compared to SipHash.

You’re creating a vector every iteration. You’re also indexing that vector, which might decrease performance a bit. Instead, you can try to get both entity and identity within the iterator without collecting it. nth is not an exit, you’ll likely need to use enumerate and add several checks. This also might increase performance if you consider using something like take_while to decrease the number of iterations in case either of indexes is not the last in the iterator.

Hard to believe actually. Perhaps someone else will tell the reason; I cannot.

3 Likes

What if I map a unique integer to each unique entity and identifier at the beginning of the and refer to these the whole script long.

This is basically what string interning means. The simplest is to use an IndexSet of &str or Box<str> and store the indices as u32. Don't worry about the overhead. The reduced memory usage typically makes programs faster because more relevant data fits in the cache.

After skimming through the rest of your code, are you sure, you are using an optimal algorithm? It looks like some graph connectivity problem.

And you are writing a program, not a script :wink:

EDIT:
I think your python scripts runs in O(n * k) where n is the number of entries and k the size of the largest disconnected graph. I think you can do O(n) with this

Python snippet
fwd = {}
bwd = {}

for a, b in network:
    fwd.setdefault(a, set()).add(b)
    bwd.setdefault(b, set()).add(a)

sets = []
k_to_set = {}

while fwd:
    k, vs = fwd.popitem()
    current_set = {k}
    k_to_set[k] = current_set
    
    stack = list(vs)
    while stack:
        v = stack.pop()
        ks = bwd.pop(v, ())
        for k in ks:
            if k in current_set:
                pass
            else:
                current_set.add(k)
                k_to_set[k] = current_set
                stack.extend(fwd.pop(k, ()))
    sets.append(current_set)
    
    # in principle we can write to disk here

You can translate that to rust with just a few modifications.

2 Likes

I removed a few unnecessary clones from your code. Hope this further helps to reduce the memory consumption:

use std::collections::{HashMap, HashSet};
use std::env;
use std::fs::File;
use std::io::{self, BufRead};
use std::time::Instant;

fn load_mappings_from_csv(filename: &str, entity_col: &str, identity_col: &str) -> (Vec<String>, Vec<String>) {
    let file = File::open(filename).expect("Could not open file");
    let reader = io::BufReader::new(file);
    let mut lines = reader.lines();

    let header_row = lines.next().expect("No header row").expect("Error reading header");
    let headers: Vec<&str> = header_row.split(',').collect();

    let entity_index = headers.iter().position(|&x| x == entity_col).expect("Entity column not found");
    let identity_index = headers.iter().position(|&x| x == identity_col).expect("Identity column not found");

    let mut vec_entities = Vec::new();
    let mut vec_identifiers = Vec::new();

    for line in lines {
        let record = line.expect("Error reading line");
        let record: Vec<&str> = record.split(',').collect();

        let entity_val = String::from(record[entity_index]);
        let identifier_val = String::from(record[identity_index]);

        vec_entities.push(entity_val);
        vec_identifiers.push(identifier_val);
    }

    (vec_entities, vec_identifiers)
}

fn first_hop<'a>(vec_entities: &'a Vec<String>, vec_identifiers: &'a Vec<String>, capacity: usize) -> HashMap<&'a str, Vec<&'a str>> {
    let mut entity_to_identifier: HashMap<&str, Vec<&str>> = HashMap::with_capacity(capacity);
    let mut identifier_to_entity: HashMap<&str, Vec<&str>> = HashMap::new();
    let mut entity_to_entity: HashMap<&str, HashSet<&str>> = HashMap::with_capacity(capacity); // Use HashSet for deduplication

    for i in 0..vec_entities.len() {
        let entity_key = &vec_entities[i];
        let identifier = &vec_identifiers[i];

        // fill entities mapping
        entity_to_identifier.entry(entity_key.as_str()).or_insert_with(Vec::new).push(identifier.as_str());

        // fill identifiers mapping
        identifier_to_entity.entry(identifier.as_str()).or_insert_with(Vec::new).push(entity_key.as_str());

        // fulfill the 1st hop
        if let Some(entity_set) = entity_to_entity.get_mut(entity_key.as_str()) {
            if let Some(entity_vec) = identifier_to_entity.get(identifier.as_str()) {
                entity_set.extend(entity_vec.iter().cloned());
            } else {
                entity_set.insert(entity_key.as_str());
            }
        } else {
            if let Some(entity_vec) = identifier_to_entity.get(identifier.as_str()) {
                let entity_set: HashSet<&str> = entity_vec.iter().cloned().collect();
                entity_to_entity.insert(entity_key.as_str(), entity_set);
            }
        }
    }

    // Filter and collect only non-empty sets where k is not the only value in v
    let entity_to_entity: HashMap<&str, Vec<&str>> = entity_to_entity
        .into_iter()
        .filter(|(k, v)| {
            !v.is_empty() &&
            v.len() > 1 ||
            v.iter().any(|&x| x != *k)
        })
        .map(|(k, v)| (k, v.into_iter().collect()))
        .collect();

    entity_to_entity
}

fn multihop_iter<'a>(
    entity_to_entity: HashMap<&'a str, Vec<&'a str>>,
    mut shared_entities_length: HashMap<&'a str, usize>,
    capacity: usize
) -> (HashMap<&'a str, Vec<&'a str>>, HashMap<&'a str, usize>, bool) {
    let mut entity_to_entity_enhanced: HashMap<&str, HashSet<&str>> = HashMap::with_capacity(capacity);
    let mut any_chain_got_longer: bool = false;

    for (entity, shared_entities) in entity_to_entity.iter() {
        let mut all_entities: HashSet<&str> = HashSet::new();

        for shared_entity in shared_entities.iter() {
            if let Some(entity_vec) = entity_to_entity.get(shared_entity) {
                all_entities.extend(entity_vec.iter().cloned());
            }
            if let Some(shared_entity_vec) = entity_to_entity.get(entity) {
                all_entities.extend(shared_entity_vec.iter().cloned());
            }
            if let Some(already_added_set) = entity_to_entity_enhanced.get(entity) {
                all_entities.extend(already_added_set.iter().cloned());
            }
            if let Some(already_added_set) = entity_to_entity_enhanced.get(shared_entity) {
                all_entities.extend(already_added_set.iter().cloned());
            }

            entity_to_entity_enhanced.insert(entity, all_entities.clone());
            entity_to_entity_enhanced.insert(shared_entity, all_entities.clone());

            let chain_size_before = shared_entities_length.get(shared_entity).copied().unwrap_or(0);
            let chain_size_after = all_entities.len();
            shared_entities_length.insert(shared_entity, chain_size_after);

            if chain_size_after > chain_size_before {
                any_chain_got_longer = true;
            }
        }
    }

    let entity_to_entity_enhanced: HashMap<&str, Vec<&str>> = entity_to_entity_enhanced
        .into_iter()
        .map(|(k, v)| (k, v.into_iter().collect()))
        .collect();

    (entity_to_entity_enhanced, shared_entities_length, any_chain_got_longer)
}


fn main() {
    let now = Instant::now();
    let args: Vec<String> = env::args().collect();

    if args.len() < 4 {
        eprintln!("Usage: {} <filename> <entity_col> <identity_col>", &args[0]);
        std::process::exit(1);
    }

    let filename = &args[1];
    let entity_col = &args[2];
    let identity_col = &args[3];

    println!("Start storing csv data in vectors.");
    let nodes_edges = load_mappings_from_csv(filename, entity_col, identity_col);
    let vec_entities: Vec<String> = nodes_edges.0;
    let vec_identifiers: Vec<String> = nodes_edges.1;
    let capacity: usize = vec_entities.len();

    println!("Calculate first hop");
    let mut entity_to_entity = first_hop(&vec_entities, &vec_identifiers, capacity);

    // executing the first hop
    let mut any_chain_got_longer: bool = true;
    let mut shared_entities_length: HashMap<&str, usize> = HashMap::with_capacity(capacity);

    while any_chain_got_longer {
        println!("Calculate iteration in multihop");
        let result = multihop_iter(entity_to_entity, shared_entities_length, capacity);
        entity_to_entity = result.0;
        shared_entities_length = result.1;
        any_chain_got_longer = result.2;
    }

    let elapsed = now.elapsed();
    println!("Elapsed: {:.2?}", elapsed);

}
1 Like

You are right. This is a graph connectivity problem.

The algorithm should be faster than O(n * k). One question about the solution you shared:
What format is network expected to be? Nodes and edges? It looks interesting :slight_smile:

Oh wow. The memory peak was about the same, but the algorithm got faster.

I think all of them should have been caught by Clippy. Are you running Clippy on your project?

3 Likes

I have never heard of clippy before (I am fairly new to Rust, this is my second program basically)

@Bruecki
I tried to use your Python program:

import csv
import pandas as pd
from typing import Dict, List, Union

def create_synthetic_data():
synth = pd.DataFrame(
{
"user": ["A", "B", "B", "C", "C", "D", "D", "E", "E", "E", "F", "F", "G",
"M", "N", "N", "O", "O", "P"],
"message_id": ["12", "12", "13", "13", "1045", "1045", "2095", "2095", "883", "6634", "6634", "7777", "7777",
"7768", "7768", "8998", "7768", "9000", "9000"]
}
)
return synth

synth = create_synthetic_data()

fwd = {}
bwd = {}

for a, b in zip(synth["user"], synth["message_id"]):
fwd.setdefault(a, set()).add(b)
bwd.setdefault(b, set()).add(a)

sets =
k_to_set = {}

while fwd:
k, vs = fwd.popitem()
current_set = {k}
k_to_set[k] = current_set

stack = list(vs)
while stack:
v = stack.pop()
ks = bwd.pop(v, ())
for k in ks:
if k in current_set:
pass
else:
current_set.add(k)
sets.append(current_set)

Probably I did some mistake when calling the function. o far I see as output the following results...

k_to_set:

{'P': {'O', 'P'},
'O': {'M', 'N', 'O'},
'N': {'N'},
'M': {'M'},
'G': {'F', 'G'},
'F': {'E', 'F'},
'E': {'D', 'E'},
'D': {'C', 'D'},
'C': {'B', 'C'},
'B': {'A', 'B'},
'A': {'A'}}

and sets:

[{'O', 'P'},
{'M', 'N', 'O'},
{'N'},
{'M'},
{'F', 'G'},
{'E', 'F'},
{'D', 'E'},
{'C', 'D'},
{'B', 'C'},
{'A', 'B'},
{'A'}]

The desired output would be the following though:

{'A': ['A', 'F', 'E', 'C', 'D', 'G', 'B'],
'B': ['A', 'F', 'E', 'C', 'D', 'G', 'B'],
'C': ['A', 'F', 'E', 'C', 'D', 'G', 'B'],
'D': ['A', 'F', 'E', 'C', 'D', 'G', 'B'],
'E': ['A', 'F', 'E', 'C', 'D', 'G', 'B'],
'F': ['A', 'F', 'E', 'C', 'D', 'G', 'B'],
'G': ['A', 'F', 'E', 'C', 'D', 'G', 'B'],
'M': ['P', 'N', 'O', 'M'],
'N': ['P', 'N', 'O', 'M'],
'O': ['P', 'N', 'O', 'M'],
'P': ['P', 'N', 'O', 'M']}

So basically for each entity I can see all directly and indirectly connected entities (multihop).

You missed stack.extend(fwd.pop(k, ())).

I used an equivalent of zip(synth["user"], synth["message_id"]) as network.

1 Like

Wait, so there's actually no need to keep those duplicate lists. You can repeatedly merge subsegments in your working data copy.

The result could be something like:

2 Likes

@Bruecki : That runs 10 times faster than my solution, awesome!

2 Likes

What I'd do is just maintain a single, separate mapping from textual entries to very simple unique IDs (e.g. sequential u32s) for creating the graph, and then project out the useful information on-demand. I would also make the mapping more compact by storing all strings in a single allocation, and only handing out small (position, length) ranges, such as in this Playground:

#[derive(Debug)]
struct CompactStringMap {
    buffer: String,
    ranges: Vec<EntryRanges>,
    idx_by_key: Vec<u32>,
}

#[derive(Clone, Copy, Debug)]
struct EntryRanges {
    pos: u32,
    id_len: u16,
    key_len: u16,
}

impl CompactStringMap {
    fn with_capacity(cap: usize) -> Self {
        CompactStringMap {
            buffer: String::with_capacity(cap),
            ranges: Vec::with_capacity(cap / 8),
            idx_by_key: Vec::new(),
        }
    }

    /// Returns the index of the new entry.
    fn push_entry(&mut self, id: &str, key: &str) -> Result<u32, TryFromIntError> {
        let start = self.buffer.len();
        self.buffer.push_str(id);
        let mid = self.buffer.len();
        self.buffer.push_str(key);
        let end = self.buffer.len();

        let pos: u32 = start.try_into()?;
        let id_len = u16::try_from(mid - start)?;
        let key_len = u16::try_from(end - mid)?;
        let ranges = EntryRanges { pos, id_len, key_len };
        let index: u32 = self.ranges.len().try_into()?;

        self.ranges.push(ranges);

        Ok(index)
    }

    fn len(&self) -> u32 {
        self.ranges.len() as u32 // safe: we never grow beyond `u32::MAX` items
    }

    /// Create the inverse mapping by sorting the records.
    fn build_inverse(&mut self) {
        let len = self.len();
        let mut idx_by_key = mem::take(&mut self.idx_by_key); // re-use allocation, if any

        idx_by_key.clear();
        idx_by_key.extend(0..len);
        idx_by_key.sort_unstable_by(|&i, &j| {
            let (_, x) = self.get(i).expect("OOB in sort_inverse");
            let (_, y) = self.get(j).expect("OOB in sort_inverse");
            x.cmp(y)
        });

        self.idx_by_key = idx_by_key;
    }

    fn get(&self, index: u32) -> Option<(&str, &str)> {
        let &range = self.ranges.get(index as usize)?;
        Some(self.get_range(range))
    }

    fn entries(&self) -> impl Iterator<Item = (&str, &str)> {
        self.ranges.iter().map(|&r| self.get_range(r))
    }

    fn indices_for_key<'a>(&'a self, key: &'a str) -> impl Iterator<Item = u32> + 'a {
        let idx = self
            .idx_by_key
            .binary_search_by_key(
                &key,
                |&r| self.get(r).expect("OOB in get_idx_by_key").1
            );

        match idx {
            Ok(idx) => {
                let head = self.idx_by_key[..idx]
                    .iter()
                    .rev()
                    .copied()
                    .take_while(move |&i| self.get(i).unwrap().1 == key);

                let tail = self.idx_by_key[idx..]
                    .iter()
                    .copied()
                    .take_while(move |&j| {
                        let Some((_, k)) = self.get(j) else { return false };
                        k == key
                    });

                Box::new(head.chain(tail)) as Box<dyn Iterator<Item = u32>>
            }
            Err(_) => {
                Box::new(iter::empty()) as Box<dyn Iterator<Item = u32>> // not found
            }
        }
    }

    fn ids_for_key<'a>(&'a self, key: &'a str) -> impl Iterator<Item = &'a str> + 'a {
        self.indices_for_key(key).filter_map(|i| self.get(i).map(|(id, _key)| id))
    }

    /// Private helper
    fn get_range(&self, range: EntryRanges) -> (&str, &str) {
        let start = range.pos as usize;
        let mid = start + range.id_len as usize;
        let end = mid + range.key_len as usize;
        (&self.buffer[start..mid], &self.buffer[mid..end])
    }
}
2 Likes

Finding the transitive closure of a graph is a well-studied problem, have you researched different algorithms and their properties?

1 Like

Hi,

that is a good point. So when I have the time then I try to get some own intuition first. (that part is done) The next part is indeed research on available algorithms and implementations.

If you have a good source to recommend, I am happy to hear :slight_smile: