Hello everyone. Recently I started playing with Rust. Trying to understand how it works. After Kotlin, Typescript and Go it become headache for me ) I wrote small app which reads messages from channel and wrote them into file. I receive unexpected for me memory usage. Code below. If anyone can explain me what I doing wrong, I will be very grateful.
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;
use std::io::{Write};
use std::fs::File;
use std::fs::OpenOptions;
use jemalloc_ctl::{stats, epoch};
const MSG_NUM: usize = 10000000;
const BUFFER_SIZE: usize = 1000;
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
fn main() {
let e = epoch::mib().unwrap();
let allocated = stats::allocated::mib().unwrap();
let resident = stats::resident::mib().unwrap();
let (sender, receiver): (Sender<Vec<u8>>, Receiver<Vec<u8>>) = mpsc::channel();
let (buffered_sender, buffered_receiver): (Sender<Vec<Message>>, Receiver<Vec<Message>>) = mpsc::channel();
{
for _ in 0..MSG_NUM {
match sender.send(String::from("Hello World!").into_bytes()) {
Ok(_) => continue,
Err(err) => {
println!("Error {}", err);
continue
},
}
}
drop(sender)
}
e.advance().unwrap();
println!("Step 1. {} MB allocated. {} MB resident", allocated.read().unwrap() / 1000000, resident.read().unwrap() / 1000000);
{
let mut buffer: Vec<Message> = Vec::new();
loop {
let next_msg = match receiver.recv() {
Ok(msg) => msg,
Err(_) => {
println!("Channel closed for \"receiver\".");
break;
}
};
buffer.push(Message {bytes: next_msg});
if buffer.len() == BUFFER_SIZE {
match buffered_sender.send(buffer.clone()) {
Ok(_) => {},
Err(err) => {
println!("Error: {}", err);
continue;
}
}
buffer.clear()
}
}
drop(buffered_sender);
e.advance().unwrap();
println!("Step 2. Excpected to see same amount of memory like in Step 1, but was: {} MB allocated. {} MB resident", allocated.read().unwrap() / 1000000, resident.read().unwrap() / 1000000);
};
thread::spawn(move || {
let mut file = OpenOptions::new().create(true).append(true).open("foo.txt").unwrap();
loop {
match buffered_receiver.recv() {
Ok(messages) => {
on_msg(messages, &mut file);
},
Err(_) => {
println!("Channel closed for \"buffered_receiver\".");
break;
}
};
}
e.advance().unwrap();
println!("Step 3. Excpected to see around 0 MB allocated, but was: {} MB allocated. {} MB resident", allocated.read().unwrap() / 1000000, resident.read().unwrap() / 1000000);
});
loop {
}
}
fn on_msg(buffer: Vec<Message>, file: &mut File) {
let mut bytes: Vec<u8> = Vec::new();
for msg in buffer.iter() {
bytes.extend(msg.bytes.iter());
}
let _ = file.write(&*bytes);
}
#[derive(Clone)]
struct Message {
bytes: Vec<u8>
}
Execution result:
Step 1. 640 MB allocated. 653 MB resident
Channel closed for "receiver".
Step 2. Excpected to see same amount of memory like in Step 1, but was: 886 MB allocated. 942 MB resident
Channel closed for "buffered_receiver".
Step 3. Excpected to see around 0 MB allocated, but was: 480 MB allocated. 880 MB resident