Multiple-Threads Graceful Shutdown

I've been trying my best as a beginner to write a multi-threaded program then shutdown all threads gracefully without implementing Drop trait or just calling drop(T) or creating an expression that causes error intentionally. Code below is an implementation of what I wanted, it compiles and works fine but I don't have depth knowledge to detect any runtime problem

use std::{thread, time::{Duration, Instant}, sync::{Arc, Mutex, RwLock, mpsc}};

fn main() {
  let (sender, receiver) = mpsc::channel::<String>();
  let receiver = Arc::new(Mutex::new(receiver));
  let signal_stop = Arc::new(RwLock::new(false));
  let mut threads = Vec::with_capacity(6);
  
  let signal_stop_ref_1 = Arc::clone(&signal_stop);
  threads.push(thread::spawn(move || { // thread sending
    let start_time = Instant::now();
    loop {
      let duration = start_time.elapsed().as_millis();
      sender.send(format!("{duration} milliseconds")).unwrap();
      if duration >= 30_000 {
        let mut signal_stop = signal_stop_ref_1.write().unwrap();
        *signal_stop = true;
        break; // stop thread
      }
      thread::sleep(Duration::from_secs(1));
    }
  }));
  
  for _ in 0..5 {
    let (receiver, signal_stop_ref_2) = (
      Arc::clone(&receiver),
      Arc::clone(&signal_stop)
    );
    threads.push(thread::spawn(move || { // 5 threads receiving
      loop {
        let message = receiver.lock().unwrap().recv();
        if *(signal_stop_ref_2.read().unwrap()) && message.is_err() {
          break; // stop thread if conditions met
        }
        if let Ok(msg) = message {
          println!("received after {}", msg);
        }
      }
    }));
  }
  
  for thread in threads {
    thread.join().unwrap();
  }
  println!("Flow back to main");
}

When the threads wait for channels, one way to implement shutdown is to make use of the fact that when all senders are destroyed, the receiver starts returning errors when you receive messages. You can use this instead of a separate shutdown signal.

Putting an mpsc receiver into an Arc/Mutex is usually a design mistake. Either remove the Arc/Mutex, or if you actually need multiple receivers, use a channel like crossbeam that supports this.

2 Likes

I was following a tutorial about creating multi-threaded web server but it was shutting down just after a single connection. I'm sure that happened because every case of RecvError was handled by a break. I didn't like the behaviour

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.