How to shutdown a socket

In my application I'm making use of unix domain sockets to perform local communication and I've used the uds crate for that. I need to shutdown the sockets(both listeners and clients, selectively), but with my current code design I'm not able to. Can somebody help me here.

main.rs

mod uds_listener;
mod uds_client;

use std::thread::JoinHandle;

use uds_listener::UdsListener;
use uds_client::UdsClient;
    
fn main() {
    let mut uds_listeners_thread_join_handle_list: Vec<JoinHandle<()>> = Vec::new();
    for i in  0..256 {
        let uds_listnener = UdsListener::new(&format!("{}{}{}", "/tmp/sock_", i.to_string(), ".sock"));
        let uds_listener_jh = uds_listnener.spawn_thread();
        uds_listeners_thread_join_handle_list.push(uds_listener_jh);
    }

    let mut uds_clients_thread_join_handle_list: Vec<JoinHandle<()>> = Vec::new();
    for i in  0..256 {
        let uds_client = UdsClient::new(&format!("{}{}{}", "/tmp/sock_", i.to_string(), ".sock"));
        let uds_client_jh = uds_client.spawn_thread();
        uds_clients_thread_join_handle_list.push(uds_client_jh);
    }

    // but how to request threads blocked on data reception to terminate
    // one way is to shutdown the socket(both read and write, which will result in data reception of length 0), but that's not possible with my current code design as socket is owned by thread
    println!("waiting for listener threads to join");
    while let Some(ljh) = uds_listeners_thread_join_handle_list.pop() {
        let _ = ljh.join();
    }

    println!("waiting for client threads to join");
    while let Some(cjh) = uds_clients_thread_join_handle_list.pop() {
        let _ = cjh.join();
    }
}

uds_listener.rs

use std::thread;

use uds::UnixSeqpacketListener;

pub struct UdsListener {
   listener: UnixSeqpacketListener,
}

impl UdsListener {
   pub fn new(sock_name: &str) -> Self {
      Self {
         listener: Self::bind(sock_name),
      }
   }

   fn bind(sock_name: &str) -> UnixSeqpacketListener {
      match UnixSeqpacketListener::bind(sock_name) {
         Ok(listener) => listener,
         Err(e) => panic!("uds listener bind failed for {sock_name}: {e:?}"),
      }
   }

   pub fn spawn_thread(mut self) -> thread::JoinHandle<()> {
      thread::spawn(move || {
         self.run();
      })
   }

   fn run(&mut self) {
      if let Ok((socket, _)) = self.listener.accept_unix_addr() {
         let mut buff = [0u8; 8192];
         loop {
            match socket.recv(&mut buff) {
               Ok(length) => {
                  if length == 0 {
                     break;
                  }
               },
               Err(e) => panic!("uds listener read failed: {e:?}"),
            }
         }
      }
   }
}

uds_client.rs

use std::{time, thread};

use uds::UnixSeqpacketConn;

pub struct UdsClient {
   socket: UnixSeqpacketConn,
}

impl UdsClient {
   pub fn new(sock_name: &str) -> Self {
      Self {
         socket: Self::connect(sock_name),
      }
   }

   fn connect(sock_name: &str) -> UnixSeqpacketConn {
      loop {
         match UnixSeqpacketConn::connect(sock_name) {
            Ok(socket) => return socket,
            Err(_) => thread::sleep(time::Duration::from_secs(1)),
         }
      }
   }

   pub fn spawn_thread(mut self) -> thread::JoinHandle<()> {
      thread::spawn(move || {
         self.run();
      })
   }

   fn run(&mut self) {
      let mut buff = [0u8; 8192];
      loop {
         match self.socket.recv(&mut buff) {
            Ok(length) => {
               if length == 0 {
                  break;
               }
            },
            Err(e) => panic!("uds client read failed: {e:?}"),
         }
      }
   }
}

cargo.toml

[package]
name = "udss"
version = "0.1.0"
edition = "2021"

[dependencies]
uds = "0.4.2"

What does "I'm not able to" mean?

  1. You don't know what code to write to shut down the sockets?
  2. You wrote the code to shutdown the sockets, but you get a compile error -- in that case post the full error.
  3. You wrote the code to shutdown the sockets, but you believe they are not shutting down -- in that case what behavior do you see?
  4. You wrote the code to shutdown the sockets, but you get a runtime error -- in that case post the full error.
1 Like

I know the code to write to shutdown a socket, for eg.

socket.shutdown(Shutdown::Both)

But the problem is I'm not able to access socket which is of type UnixSeqpacketConn from another thread in my current code design which is where I'm stuck

I'll make some assumptions, you can correct me if I'm wrong:

  • You want to trigger shutdown of the sockets in main(), or at least in the main thread.
  • When you close a socket, you also want to stop the socket's thread.

One very simple approach is to use an Arc<AtomicBool> keep_running flag that is shared between the main thread and all the client threads. It is checked in the client threads in each iteration of its loop; if it is false you close the socket and exit the loop. The main thread initializes keep_running to true and sets it to to false when you want to stop the sockets. The Arc is cloned for each client thread, which is what allows it to be shared. When you create a client thread, pass the cloned Arc to that thread's constructor.

If instead you want to control each client thread separately, you can create a separate Arc<AtomicBool> for each one.

Here are some code snippets (not compiled).

To create the Arc and clone it in the main thread:

use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};

// Create the flag:
let keep_running = Arc::new(AtomicBool::new(true));

// When creating a client thread:
let uds_client = UdsClient::new(keep_running.clone() /* ... */);

// To stop the client threads:
keep_running.store(false, Ordering::Relaxed);

To use the Arc in the client thread:

// Add the flag field to the UdsClient struct, and set during construction:
keep_running: Arc<AtomicBool>;

// Check the flag at the top of the client loop:
loop {
    if !self.keep_running.load(Ordering::Relaxed) {
        // TODO: shutdown socket
        break;
    }
    // ...
}

Another thing is that you need some way to prevent the client thread loop from blocking forever when calling recv when there are no packets coming in, since then it will never check keep_running. I haven't used domain sockets so I'm not sure this is correct, but you probably want to set a read_timeout so that recv wakes up periodically to check the flag.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.