Understanding of memory allocation

Hello everyone. Recently I started playing with Rust. Trying to understand how it works. After Kotlin, Typescript and Go it become headache for me ) I wrote small app which reads messages from channel and wrote them into file. I receive unexpected for me memory usage. Code below. If anyone can explain me what I doing wrong, I will be very grateful.

use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;
use std::io::{Write};
use std::fs::File;
use std::fs::OpenOptions;
use jemalloc_ctl::{stats, epoch};

const MSG_NUM: usize = 10000000;
const BUFFER_SIZE: usize = 1000;

#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

fn main() {

    let e = epoch::mib().unwrap();
    let allocated = stats::allocated::mib().unwrap();
    let resident = stats::resident::mib().unwrap();

    let (sender, receiver): (Sender<Vec<u8>>, Receiver<Vec<u8>>) = mpsc::channel();
    let (buffered_sender, buffered_receiver): (Sender<Vec<Message>>, Receiver<Vec<Message>>) = mpsc::channel();

    {
        for _ in 0..MSG_NUM {
            match sender.send(String::from("Hello World!").into_bytes()) {
                Ok(_) => continue,
                Err(err) => {
                    println!("Error {}", err);
                    continue
                },
            }
        }
        drop(sender)
    }

    e.advance().unwrap();
    println!("Step 1. {} MB allocated. {} MB resident", allocated.read().unwrap() / 1000000, resident.read().unwrap() / 1000000);

    {
        let mut buffer: Vec<Message> = Vec::new();

        loop {

            let next_msg = match receiver.recv() {
                Ok(msg) => msg,
                Err(_) => {
                    println!("Channel closed for \"receiver\".");
                    break;
                }
            };

            buffer.push(Message {bytes: next_msg});

            if buffer.len() == BUFFER_SIZE {
                match buffered_sender.send(buffer.clone()) {
                    Ok(_) => {},
                    Err(err) => {
                        println!("Error: {}", err);
                        continue;
                    }
                }
                buffer.clear()
            }
        }

        drop(buffered_sender);

        e.advance().unwrap();
        println!("Step 2. Excpected to see same amount of memory like in Step 1, but was: {} MB allocated. {} MB resident", allocated.read().unwrap() / 1000000, resident.read().unwrap() / 1000000);

    };

    thread::spawn(move || {
        let mut file = OpenOptions::new().create(true).append(true).open("foo.txt").unwrap();

        loop {
            match buffered_receiver.recv() {
                Ok(messages) => {
                    on_msg(messages, &mut file);
                },
                Err(_) => {
                    println!("Channel closed for \"buffered_receiver\".");
                    break;
                }
            };
        }

        e.advance().unwrap();
        println!("Step 3. Excpected to see around 0 MB allocated, but was: {} MB allocated. {} MB resident", allocated.read().unwrap() / 1000000, resident.read().unwrap() / 1000000);
    });

    loop {

    }
}

fn on_msg(buffer: Vec<Message>, file: &mut File) {
    let mut bytes: Vec<u8> = Vec::new();
    for msg in buffer.iter() {
        bytes.extend(msg.bytes.iter());
    }
    let _ = file.write(&*bytes); 
}

#[derive(Clone)]
struct Message {
    bytes: Vec<u8>
}

Execution result:
Step 1. 640 MB allocated. 653 MB resident
Channel closed for "receiver".
Step 2. Excpected to see same amount of memory like in Step 1, but was: 886 MB allocated. 942 MB resident
Channel closed for "buffered_receiver".
Step 3. Excpected to see around 0 MB allocated, but was: 480 MB allocated. 880 MB resident

Here's a short list of things that I can spot in the code (without a full analysis):

  • Liberal use of Vec<_> is good for avoiding the borrow checker, but because it's heap-allocated, you can expect clones to consume more memory than necessary.
  • You are cloning where it probably make more sense to use slices. on_msg for example; just write each buffer individually instead of cloning them all into one super-vector. (You can also look into using vectored I/O for this: Write::write_vectored)
  • mpsc::channel is unbounded, and you are shoving 10 million messages into them, each owning heap-allocated vectors. Use mpsc::sync_channel or alternative channel implementations like crossbeam_channel::bounded to create a bounded channel that limits the amount of messages it will buffer. The top-level channels are most likely where all of the memory is being used. The internal buffers grow unbounded, and the buffer will never shrink. You can only drop these to free the memory they own.
2 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.