Huge memory consumption

in the following code , I setup a work queue and submit 10_0000_0000 jobs to the queue . There are 5 threads to drain the queue. If I add line A , the memory is exhausted very quickly. Does this mean the memory is not freed ASAP? How to address this issue? Many thanks.

use std::thread;
use std::sync::mpsc::channel;
use crossbeam_channel::unbounded;
fn main() {
  struct Chan<TX,RX> {
    tx : TX,
    rx : RX,
  }

  let wq = {
    let (tx,rx) = unbounded();
    Chan{tx,rx}
  };

  let mq = {
    let (tx,rx) = channel();
    Chan{tx,rx}
  };

  let mu = Arc::new(Mutex::new(0u32));
  let submit = |count| {
    for i in 1..=count {
      let m = mu.clone();
      wq.tx.send(move||{
        *m.lock().unwrap() += 1;
        println!("job {} finished",i);  // A , there is a big difference of memory consumption
                                        // between without and with this line
      });
    }
  };
  
  for i in 1..=5 {
    let r = wq.rx.clone();
    let s = mq.tx.clone();
    thread::spawn(move || {
      println!("m{} started",i);
      let count = r.iter().inspect(|x| x()).count() as u32;
      s.send((i,count)).unwrap();
    });
  }

  let count = 10_0000_0000;
  submit(count);
  
  drop(wq.tx);

  drop(mq.tx);
  let total : u32 = mq.rx.iter()
  .inspect(|(i,c)| println!("m{} exited, jobs={}",i,c))
  .map(|(i,c)| c)
  .sum();
  println!("total jobs {}",total);
  println!("{}",*mu.lock().unwrap());
}

This is probably due to your use of an unbounded channel. If the threads can't keep up with the sender, you are going to fill up your memory with pending messages. The println makes the threads slow enough that they can't keep up.

Use a bounded channel.

4 Likes

cool . got it.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.