Dear all,
I have a memory consumption question. I am tasks in a pooled thread scenario. My problem is that the memory consumption is very, very high. The final result is typically >40MB but the memory consumption is >> 60GB.
I am wondering because the threads should free their memory once they are finished. But it seems as they do not do it.
The actual rouine shall filter a data set. Parts are generated and freed within the thread. The filtered data is the send to a channel.
I checked it with heaptrack
and could see a linear growth in memory. Which is ok, but it is far too steep.
I was expecting that the threads free the used memory once they are finished, but it seems not to be the case. What is wrong here?
This is the code:
// set is the input and a Vec<Vec<LargeDataStructure>>
// The basic Routine
let tuple_gen = Arc::new(Mutex::new(TupleGenerator::new(&set)));
// arc the set
let set = Arc::new(set);
// I divide the `set` into chunks and calculate each chunk in a thread
let batch_size = calc_batch_size(&final_size, &num_cpus::get(), nb_of_sets);
let result: Arc<Mutex<Vec<Vec<LargeDataStructure>>>> = Arc::new(Mutex::new(vec![]));
let pool = ThreadPool::new(num_cpus::get() as usize - 1);
let (tx, rx) = channel();
for bn in 0..batch_nb + 1 {
let intermediate_ = intermediate.clone();
let set_ = set.clone();
let tuple_gen_ = tuple_gen.clone();
let tx_ = tx.clone();
pool.execute(move || {
let tuples = tuple_gen_.lock().unwrap().get_next_batch();
let mut tmp_result: Vec<Vec<SumComposition>> = vec![];
// this for-loop consumes most of the memory
for tuple in tuples {
// generate a sumcomposition tuple from the index tuple
let mut tmp = vec![];
for (j, index) in tuple.iter().enumerate() {
tmp.push(set_[j][*index].clone());
}
// if CONSTRAINTS are given, check it
if intermediate_.has_suchthat {
let left = get_left_side(&tmp, &intermediate_.terms); // -> LargeDataStructure
let right = get_right_side(&tmp, &intermediate_.terms); // -> LargeDataStructure
if left == right {
tmp_result.push(tmp);
}
} else {
tmp_result.push(tmp);
}
}
// everything should be freed here
// the result is send to the parent thread
tx_.send(tmp_result).unwrap();
});
}
for nb in 0..batch_nb + 1 {
let tmp_result = rx.recv().unwrap();
result.extend(tmp_result);
}
Ok(result)