Memory leak in hashmap with tokio rwlock

Hello,
I'm confusing to dealing with memory leak in rust.

I'd like to write an async function that periodically update the state with the latest values from data store.
The code looks like belows

use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time;

struct MyState {
    value: i32,
}

type Result<T> = std::result::Result<T, std::io::Error>;

#[async_trait]
trait MyStore {
    async fn get_latest_state(&self, id: u32) -> Result<MyState>;
}

struct MyStoreImpl {}

#[async_trait]
impl MyStore for MyStoreImpl {
    async fn get_latest_state(&self, _id: u32) -> Result<MyState> {
        Ok(MyState { value: 10 })
    }
}

async fn update_state(
    id: u32,
    state: Arc<RwLock<HashMap<u32, MyState>>>,
    store: Arc<dyn MyStore + Send + Sync>,
) -> Result<()> {
    let mut interval = time::interval(Duration::from_millis(100));
    loop {
        interval.tick().await;
        let new_state = store.get_latest_state(id).await?;
        if let Some(state) = state.write().await.get_mut(&id) {
            *state = new_state;
        }
    }
}

#[tokio::main]
async fn main() {
    let app = Arc::new(MyStoreImpl {});
    let state = Arc::new(RwLock::new(HashMap::new()));
    update_state(1, state, app).await;
}
[package]
name = "leak"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.68"
tokio = { version = "1.28.2", features = ["full"] }

However, when I run this codes in kubernetes, the memory usage grows and eventually OOMKilled.

I searched some tool, and used valgrind. but there was no definitely lost, but some possibly lost on HashMap.

Here are some results. I currently don't have the full result in text right now.

12,688,316 bytes in 15,168 blocks are possibly lost in loss record 224 of 224.
malloc
...
hashbrown::raw::alloc::inner::do_alloc (alloc.rs.95)
...
hashbrown::rustc_entry::<impl hashbrown::map::HashMap...::rustc_entry
std::collections::hash::map::HashMap<K,V,S>::entry (map.rs:855)

After looking at the results, I'm still have no idea how to resolve it.

When I modified the way to update entry stored in hashmap, the memory usage does not grow as before and it seem that the memory leak was resolved.

// before
if let Some(state) = state.write().await.get_mut(&id) {
    *state = new_state;
}

// after
state
    .write()
    .await
    .get_mut(&id)
    .and_then(|s| Some(*s = new_state));

I don't understand why these codes results in different memory footprints.

Please post a minimal, compiling, reproducible example. Your code even has syntax errors (using instead of use, a missing closing angle bracket, etc.) and type errors (the return type is Result, but you aren't returning anything), which means that this is definitely not the real code you are running.

Attempting a minimal reproduction unsurprisingly shows that the state is being dropped, which means that the HashMap must have been dropped as well. There's likely no memory leak.

In general, tools like Valgrind are not very useful in Rust unless you have unsafe code, because you can't cause memory unsafety in safe Rust, and memory leaks are a result of pretty apparent practices (Rc cycles, Box::leak, or static collections that can grow). I strongly suspect that Valgrind is probably wrong in signalling a memory leak here.

2 Likes

Thank you for giving time for helping me.
I'm new to rust and I'm sorry for the incomplete and wrong initial code example. I fixed it. This is the link in playground. However, it is not the entire code I executed on k8s.

I also think rust guarantees memory safey but it does not means that rust is free for memory leak even not using unsafe code.
And I found this post solving-memory-leaks-in-rust that does not use Rc cycling, Box::leak, or static collections as you mentioned earlier but still have memory leaks.

That's not what I claimed.

That code contains an infinite loop. If you actually try to execute it, the Playground times out and forcibly shuts it down. No wonder the Drop mechanism isn't invoked. That's probably going to be the same if you kill it by hand or if so does some watchdog mechanism in your environment.

So no, no leaks here. Get rid of the infinite loop if you want more sensible results.

2 Likes

I agree running infinite loop on playground is not allowed.
But update_state function actually lives during the entire lifetime of the web application.

tokio::spawn(update_state(id, self.state.clone(), self.store.clone()));

I will try to figure it out again where the root cause comes from.

It comes from the infinite loop. If you never let the function return, then drop won't be executed. But if you are merely concerned about memory usage, then you can just ignore all of this, because the OS will clean up after your process anyway.

12MB shouldn't lead to an OOM error. So the problem is likely some memory that is still reachable, but grows indefinitely. My primary suspect would be that Arc<RwLock<HashMap<u32, MyState>>>. When do you insert something into it? In your example you only ever modify elements that are already present, but they have to come from somewhere, otherwise your application would do nothing. And do you ever remove elements from it? If you only ever insert but never remove then of course the memory usage will only grow more and more.

2 Likes

I think possibly lost is not the main reason for memory leak as well.
The hashmap is initialized only once at startup called from main.

pub async fn start(&mut self) -> Result<()> {
    let rooms = self.store.load().await?;

    for r in rooms.iter() {
        self.txns.lock().await.insert(r.id, Vec::new());
        self.state.write().await.insert(r.id, MyState::default());
        tokio::spawn(update_state(
            r.id,
            self.state.clone(),
            self.store.clone(),
        ));
    }

    ...

    Ok(())
}

If that's all the code you have then there shouldn't be memory leaks, but clearly you haven't shown it all. For example, what do you end up doing with self.txns? Looks like it's a map of Vecs, what do you do with that? Do you only push to it or do you pop from it sometimes (and if so, do you ever shrink the Vec)? More in general: what is the state that your application holds while it's executing and when do you modify that state by adding/removing data?

3 Likes

My application is web application using axum and the data needed to response is stored in redis.
To reduce access to redis, instead of accessing redis every HTTP request it is queued in txns and every millisecond application accesses redis and dequeue txns in FIFO manner by using drain function on vec.

So, the txns grows when HTTP request received and shrinks every 1ms. And even there is no request received, the memory usage grow up. Therefore, that's why I didn't mentioned earlier.

MyState is a cache to quickly respond to some http requests that does not requires redis access and it is updated every 100ms.

The only difference I modified was in the initial questions.

state
    .write()
    .await
    .get_mut(&id)
    .and_then(|s| Some(*s = new_state));

Thank you for your time and I am going to check the entire code again based on your comments.

May I ask why you don’t just use redis as your cache (directly)? I get the potential upside but the strategy seems redundant.

The goal of our web application is to accept more than 1M requests for a second.
We replicate the application into multiple pods on k8s. If we access redis every single request, it can be bottleneck. Therefore, we decided to process in batch every 1ms. That's not a big latency for the client.
For the perspective of redis, every batch request can be translated into O(1) operation with our key design. By batching requests, we reduce redis operations from O(N) to O(P) where N is the number of HTTP requests come from client, P is the number of pods.

Cache is for batching, and avoiding duplicate requests from different users? I ask because if multiple nodes (I mean pods, not nodes) are behind a service, the distinction may be relevant.

I'm not 100% sure that I understand your question correctly.
But, I hope the explanation below will help.

Let's say redis maintains global counter and each client receives unique counter for their requests in FIFO manner. By batching, insread of increase the counter by one it can be increased by ranges.
The drawback of this approch is that the FIFO is guranteed within a single server(pod) not the entire servers(pods).

Yes. That’s what I was talking about (I clarified my earlier question). Because we can’t be sure repeated requests will hit the same pod, we have to use some sort of shared state for the service. I was thinking have a redis instance in the node with the service. You still have more than one but it’s a local, “always ready” redis connection that scales?

There is only one redis cluster in our service. Every web server communicates with it.

You may hit the memory fragmentation problem. Are you using the default glibc memory allocator? You could profile it and maybe do a heap dump to see where the memory goes.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.