How to properly close a TcpListener, in multi-thread server?

How can a TcpListener be closed, so that "blocked" accept() operations are aborted?

I have multi-threaded HTTP server that I want to go down "gracefully" when SIGNIT (ctrl+c) is received.

Here is what I tried:

impl Server {
    pub fn new(addr: IpAddr, port: u16) -> Self {
        Self { addr, port }
    }

    pub fn run(self, handler: impl Handler + 'static) {
        let socket_addr = SocketAddr::new(self.addr, self.port);
        info!("Listening on {}", socket_addr);
        let listener = TcpListener::bind(socket_addr).unwrap();

        let stop = Arc::new(AtomicBool::new(false));
        let mut handles = Vec::new();
        let num_threads = 2usize * cpu_count();

        for _n in 0usize..num_threads {
            if let Ok(thread_listener) = listener.try_clone() {
                let my_handler = handler.clone();
                let my_stop = stop.clone();
                handles.push(thread::spawn(move || {
                    let thread_id: u64 = thread::current().id().as_u64().into();
                    info!("Thread #{thread_id} is up!");
                    loop {
                        let connection = thread_listener.accept();
                        if !my_stop.load(Ordering::Acquire) {
                            match connection {
                                Ok((stream, addr)) => Self::process_request(thread_id, &my_handler, stream, addr),
                                Err(err) => error!("Failed to establish the connection: {}", err),
                            }
                        } else {
                            break; /*shutting down */
                        }
                    }
                }));
            }
        }

        drop(ctrlc::set_handler(move || {
            warn!("Interrupted -> shutting down server !!!");
            stop.store(true, Ordering::Release);
        }));

        for handle in handles.drain(..) {
            drop(handle.join());
        }

        info!("Server is down! Goodbye...");
    }

    fn process_request(id: u64, handler: &impl Handler, mut stream: TcpStream, addr: SocketAddr) {
        /* ... */
    }
}

The problem is that the "worker" threads will be blocked in TcpListener ::accept(), even after we have set the shutdown flag. Well, at least until more requests happen to come in. But that is far from reliable...

How can "main()" thread close the TcpListener to unblock the threads ????

Thank you!


Note 1:

Yes, I could set the TcpListener to non-blocking mode. But that would be very inefficient :fearful:


Note 2:

I already tried to use cancellable_io, which seemed great at a first glance.

Unfortunately, cancellable_io::TcpListener::try_clone() always fails! It's unusable :weary:

socket already registered!


Note 3:

socket2 has a Socket::shutdown() method. Looks like exactly what we need.

...but it does not interrupt the blocked accept() call. Only the next accept() would fail then... :weary:

I don't know if there's a good solution to your problem. However, this reminds me of what I wrote some time ago on the usability of the standard library in regard to network I/O on IRLO:

After working some more with TcpStream (and UnixStream), I came across more problems in real-life usage of these interfaces:

  • Using threads and std::io, it is not easily possible to make threads abort after a certain timeout (which is required for network applications). In order to do this, I had to implement my own wrapper, which calculates a timeout for each read or write operation.
  • […]

The opposite is the case (when you'd use your connection handling differently). Non-blocking IO is far more efficient than blocking IO operations, as non-blocking IO frees your worker threads to handle other stuff (including other connections). What you'd usually do is have your main thread listen to new connections and delegate those as tasks to your worker threads to handle. Every modern webserver is using asynchronous, non-blocking IO. It looks to me like you are trying to build your own TCP Server from scratch, maybe as a learning project. I'd really recommend you'd look into the tokio tutorial, which describes how to build your own server based on a fast, asynchronous runtime (even if you don't want to use tokio, there is a lot of background knowledge concerning networking). Also, tokio supports graceful shutdown.

I also switched to tokio and don't use the standard library for that matter. However, asynchronous Rust has a steep learning curve in my opinion. At least I struggled a bit. But I would still recommend to consider using tokio.

1 Like

Well, what I mean is that TcpListener::accept() on a non-blocking socket is very inefficient, because we'd have to call it many times in a loop until it succeeds eventually. Just like a "busy" waiting. Instead, on a "normal" blocking socket, accept() only returns after we actually have a connection (or an error).

I think this is the same, regardless of whether we use a single thread to accept all incoming connections and then pass them to the workers, or whether we let the all the workers accept the incoming connections themselves. Either way, "busy" waiting on a non-blocking socket would be highly inefficient...

Using accept() on a blocking socket works fine. Only problem is, how to "unblock" the waiting threads(s) when the server is supposed to shut down? For example, in Java or C/C++ we can have a blocking ServerSocket.accept() but with timeout. I'm missing this mechanism in Rust...

You don't spin indefinitely, but instead you put your thread asleep until a new connection is available (or the request to gracefully shutdown has been received, whichever comes first).

Which is why I added

Nonetheless, there is no way in the standard library to use a JoinHandle to abort its thread. You can only wait for it to finish with join. But it will never finish, unless you get another connection and jump out of the blocking accept(). So I think you have to look for another solution or wait for the OS to cleanup your sockets after aborting your program with CTRL+C.

Yeah, that is exactly how TcpListerner::accept() behaves on a blocking socket. If the socket has been set to non-blocking mode, accept() always returns immediately, so that "spinning" is required.

With the non-blocking behavior, we can check the "stop" flag continuously and break from the loop as soon as the flag was set, but it's highly inefficient. With the blocking behavior we don't waste any CPU cycles on "spinning". Unfortunately, as it seems, Rust does not have a way to either abort a blocking accept() from another thread (e.g. by closing the socket), or to specify a reasonable timeout for the accept() call.

What you do is you must check the socket condition and the shutdown condition simultaneously. In C you could do that by creating a local socket pair for notifying the listening thread and then select/poll on both the network socket and the local (internal) socket.

With tokio, you can use tokio::select! to wait for one of two (or more) futures being available without wasting CPU time. But you can also just cancel an async task.

1 Like

Not necessarily, I think. It would be sufficient, if we could set a proper timeout for the blocking accept() call. This allows us to check the "stop" flag at reasonable interval (e.g. every 10 secs) while still efficiently waiting on incoming connections.

In C/C++, we could do this by setting a SO_TIMEOUT for the socket. And, in Java, we could use ServerSocket.setSoTimeout() method.

Yes, that is another alternative. Though it's a bit of a workaround because shutdown is unnecessarily delayed and doesn't happen instantly.

Maybe you can set a timeout using the libc crate? I don't like that solution though. (Which is why I prefer using a better suited I/O library, such as tokio.)

1 Like

Maybe you can set a timeout using the libc crate?

Actually, I looked at solution with libc before. Unfortunately, it requires a "raw" file descriptor, which we need to get from the TcpListener via as_raw_fd() from std::os::unix::io::AsRawFd trait.

This doesn't seem to be available on Windows.

(I'm working primarily on Windows, but want to stick with "portable" solution as much as possible)

Which is why I prefer using a better suited I/O library, such as tokio.

At this point I'd like to learn standard I/O, but will probably have a look at tokio another day :exploding_head:

If you don't want to do async, maybe there is a different cross-platform synchronous I/O library around, which supports setting timeouts? I don't know of any, but perhaps someone else does?

I'd be happy for any suggestions, though a pure std solution is preferable.

I still wonder: What is the intended use case of std::net::TcpListener? I mean, TcpListener::accpet() almost always will be called in a loop, in order to process a series of incoming connections. But, how can the program ever exit cleanly, after the last request (or on request), if it is neither possible to set a timeout for accept() nor can TcpListener be closed from other thread ???

Would seem like a major "shortcoming" in the net API :thinking:

(Alternative I/O libraries are great, but shouldn't net::TcpListener be "complete" in its own right?)


Anyway, here is a poorman's solution, using "dummy" connections to un-block the threads:

pub fn run(self, handler: impl Handler + 'static) {
	let socket_addr = SocketAddr::new(self.addr, self.port);
	info!("Listening on {}", socket_addr);
	let listener = TcpListener::bind(socket_addr).unwrap();

	let stop = Arc::new(AtomicBool::new(false));
	let mut thread_handles = Vec::new();
	let num_threads = 2usize * cpu_count();

	for _n in 0usize..num_threads {
		if let Ok(thread_listener) = listener.try_clone() {
			let my_handler = handler.clone();
			let my_stop = stop.clone();
			thread_handles.push(thread::spawn(move || {
				let thread_id = thread_id();
				info!("Thread #{} is up!", thread_id);
				loop {
					let connection = thread_listener.accept();
					if !my_stop.load(Ordering::Acquire) {
						match connection {
							Ok((stream, addr)) => Self::process_request(thread_id, &my_handler, stream, addr),
							Err(err) => error!("Failed to establish the connection: {}", err),
						}
					} else {
						info!("Thread #{thread_id} is going down!");
						break; /*shutting down */
					}
				}
			}));
		}
	}

	let listeners_count: usize = thread_handles.len();
	drop(ctrlc::set_handler(move || {
		warn!("Interrupted -> initiating server shutdown !!!");
		stop.store(true, Ordering::Release);
		if let Ok(local_addr) = listener.local_addr() {
			for _n in 0..listeners_count {
				for _m in 0..16 {
					if TcpStream::connect_timeout(&local_addr, Duration::from_secs(10)).is_ok() {
						debug!("Listener closed!");
						break;
					}
					thread::sleep(Duration::from_millis(100));
				}
			}
		}
	}));

	for handle in thread_handles.drain(..) {
		if handle.join().is_ok() {
			debug!("Thread terminated!");
		}
	}

	info!("Server is going down! Goodbye...");
}

Why do you care about unblocking the thread? What is the disadvantage of exiting with the listener still listening up until the time you exit.

Why do you care about unblocking the thread? What is the disadvantage of exiting with the listener still listening up until the time you exit.

In this little "demo" program probably nothing seriously bad would happen.

But it's a good practice to not terminate the application until all threads have exited "gracefully".

For example, if we don't properly join() the threads – which we can't do, if we are unable to un-block the threads from the accept() – then we don't know in which "state" the those threads are the moment when the application terminates: The thread might currently be blocked in accept(), but we don't know this for sure. The thread might just as well currently be in the middle of processing a request...

Why?

I don't see how that's possible if you design the graceful shutdown appropriately. Either the thread was already processing a request at the point the shutdown started (in which case you'd presumably know about it), or it was blocked in accept, in which case it can find out the server's shutting down immediately when accept returns and kill the connection.

Track requests, not threads.

Why?

For the reason I already pointed out: If you just terminate the application without ensuing that all threads have exited "gracefully", the threads will be killed in an unknown state. The thread might be in the middle of doing something, like writing out a file disk, sending some data to a client, etc. pp.

I don't see how that's possible if you design the graceful shutdown appropriately. Either the thread was already processing a request at the point the shutdown started (in which case you'd presumably know about it), or it was blocked in accept, in which case it can find out the server's shutting down immediately when accept returns and kill the connection.

Thing is, the shutdown signal may be received at any point in time. If we simply terminate the application (including all threads) as soon as the shutdown signal is received, then we don't know in which state the worker threads are. Very possible that we kill a thread while it is in the middle of processing a request!

Only way to avoid this and to actually have a "graceful" shutdown is to not immediately terminate the application when the shutdown signal is received, but instead set the "stop" flag for the threads, so that the threads will exit "gracefully" as soon as they get the chance to inspect the "stop" flag. That and waiting until the threads have actually exited, by calling JoinHandle::join(), before terminating.

Unfortunately, if a thread might be blocked indefinitely in accept(), because the Rust implementation of accept() has no way to set up a timeout, or otherwise interrupt the call from the "main" thread, then it's just not possible to implement a "graceful" shutdown. JoinHandle::join() is likely to wait forever...

Joining all threads is not the only way to perform a graceful shutdown. You don't need to wait until all threads have exited, you just need to wait until all threads are in a "safe" state.

1 Like

Joining all threads is not the only way to perform a graceful shutdown. You don't need to wait until all threads have exited, you just need to wait until all threads are in a "safe" state.

And how? From the point of view of the "main" thread, a worker thread is either "still running" or it has exited. We call JoinHandle::join() to ensure that the thread has exited.

If the thread is still running, how do you check whether the thread currently is in a "safe" state ???

Even more important: Even if, at some point in time, you somehow have determined that the worker thread is in a "safe" state right now, the thread still might go from that "safe" state into an "unsafe" state just a nanosecond later. Or, in other words, as long as the worker thread is still running, how do you ensure the thread actually remains in the "safe" state until the application has fully terminated?