How to properly close a TcpListener, in multi-thread server?

You would write some code.

Components of the system would have a handle to a shared structure that tracks how many are in unsafe states and if the system is shutting down. The API to enter an unsafe state will reject the request if the system is shutting down and the calling component would respond to that rejection by stopping. The main thread triggers the shutdown and then waits on the structure until there are no components in unsafe states.

1 Like

The usual way to handle this in a C program would be to not duplicate the listening socket to begin with, so that all the threads are actually using the same FD, and then simply close the FD from the main thread so that all pending accept calls will fail with EBADF.

This can't work with Rust's std API design here, though. While it's possible to share the same TcpListener across all the threads instead of cloning it (since accept and the other methods only need a shared reference, you can use Arc or scoped threads to share it), there's by design no way to close it other than dropping it, which isn't possible once it's been shared with the other threads. Dropping the original TcpListener after cloning it doesn't affect the clones, since they're separate file descriptors even though they refer to the same socket; it's just decrementing a reference count ultimately.

Allowing the socket to be closed without dropping it would create the opportunity for many bugs and thread safety issues, since FDs are just integers and the lowest free number is always used the next time one is created.

cancellable_io might be able do what you want if you just share the TcpListener with all your threads instead of trying to clone it, though? I haven't tried it.

1 Like

cancellable_io might be able do what you want if you just share the TcpListener with all your threads instead of trying to clone it, though? I haven't tried it.

Thanks, maybe I will give cancellable_io another try and see if it can work without try_clone().

Dropping the original TcpListener after cloning it doesn't affect the clones, since they're separate file descriptors even though they refer to the same socket; it's just decrementing a reference count ultimately. [..] Allowing the socket to be closed without dropping it would create the opportunity for many bugs and thread safety issues, since FDs are just integers and the lowest free number is always used the next time one is created.

Well, socket2 has a Socket::shotdown() method.

Unfortunately, in contrast the description (This function will cause all pending and future I/O on the specified portions to return immediately with an appropriate value), it does not abort the pending/blocked accept() call in the other thread, but rather causes any future accept() calls to fail immediately...

It seems that based on the documentation, you should in fact be using shutdown. If it doesn't work, you can reproduce it reliably in an isolated environment, it may very well be a library bug. I would suggest opening an issue on the librarys bug tracker.

You could work around it as follows - right before calling accept, set a flag which indicates that you're inside accept, and right afterwards, unset the flag (atomically). When your application shuts down, sat another cancelled flag (exactly as you've done already), then wait until the 'is-accepting' flag is set (perhaps a semaphore may be useful here), then abort. The idea is that your own code can check the cancellation flag frequently, and early exit in case its set. But while you're inside a potentially long duration blocking call from a 3rd party library function, you don't really care if those calls ever exit normally. This requires that shutdown actually performs the necessary cleanup, even if there are pending accept calls.

cancellable_io might be able do what you want if you just share the TcpListener with all your threads instead of trying to clone it, though? I haven't tried it.

Thanks, maybe I will give cancellable_io another try and see if it can work without try_clone().

It tried to share the same TcpListener with all threads, by using Arc<T>. Seems to work fine with the std:net one, but as soon as I switch to cancellable_io::TcpListener I get this error:

use cancellable_io as cancellable;

pub fn run(self, handler: impl Handler + 'static) {
    let socket_addr = SocketAddr::new(self.addr, self.port);
    info!("Listening on {}", socket_addr);
    let (listener, canceller) = cancellable::TcpListener::bind(socket_addr).unwrap();
    let listener = Arc::new(listener);

    let stop = Arc::new(AtomicBool::new(false));
    let mut thread_handles = Vec::new();
    let num_threads = 2usize * cpu_count();

    for _n in 0usize..num_threads {
        let my_listener = listener.clone();
        let my_stop = stop.clone();
        let my_handler = handler.clone();
        thread_handles.push(thread::spawn(move || {
            let thread_id = thread_id();
            info!("Thread #{} is up!", thread_id);
            loop {
                let connection = my_listener.accept();
                if !my_stop.load(Ordering::Acquire) {
                    match connection {
                        Ok((stream, _canceller, addr)) => Self::process_request(thread_id, &my_handler, stream, addr),
                        Err(err) => error!("Failed to establish the connection: {}", err),
                    }
                } else {
                    info!("Thread #{thread_id} is going down!");
                    break; /*shutting down */
                }
            }
        }));
    }

    let thread_count: usize = thread_handles.len();
    drop(ctrlc::set_handler(move || {
        warn!("Interrupted -> initiating server shutdown !!!");
        stop.store(true, Ordering::Release);
    }));

    for handle in thread_handles.drain(..) {
        if handle.join().is_ok() {
            debug!("Thread terminated!");
        }
    }

    info!("Server is going down! Goodbye...");
}
error[E0277]: `RefCell<mio::poll::Events>` cannot be shared between threads safely 
   --> src\server.rs:49:47
    |
49  |               thread_handles.push(thread::spawn(move || {
    |  _________________________________-------------_^
    | |                                 |
    | |                                 required by a bound introduced by this call
50  | |                 let thread_id = thread_id();
51  | |                 info!("Thread #{} is up!", thread_id);
52  | |                 loop {
...   |
63  | |                 }
64  | |             }));
    | |_____________^ `RefCell<mio::poll::Events>` cannot be shared between threads safe

Any idea what I need to do? :thinking:

Done:

:white_check_mark:

socket2 is binding directly to the POSIX socket APIs. If shutdown isn't behaving as you'd expect, that's something you'd need to bring up with kernel developers.

I'm unsure and perhaps would disagree here. The following is something I wrote on IRLO a while ago. Note that I refer to TcpStream, not TcpListener in that case.

For an example where a different aspect of TCP socket behavior depends on the platform, see this thread on stackoverflow .

Claudiu: When using a TCP socket, what does “shutdown(sock, SHUT_RD);” actually do? […]

[…]

user207421: It has two effects, one of them platform-dependent.

  1. recv() will return zero, indicating end of stream.
  2. Any further writes to the connection by the peer will either be (a) silently thrown away by the receiver (BSD), (b) be buffered by the receiver and eventually cause send() to block or return -1/ EAGAIN/EWOULDBLOCK (Linux), or (c) cause the receiver to send an RST (Windows).

I'm not sure how this is affects the Rust interface, but TcpStream::shutdown already documents some (different) OS specific behavior, which “may change in the future”:

Platform-specific behavior

Calling this function multiple times may result in different behavior, depending on the operating system. On Linux, the second call will return Ok(()) , but on macOS, it will return ErrorKind::NotConnected . This may change in the future.

Any platform-specific behavior seen by the remote peer, however, is not documented, which could affect the usefulness of TcpStream::shutdown(Shutdown::Read).

But that is just one example. There are more examples (apart from networking), where OS specific differences are passed-through to the user of the standard library. For example in RwLock:

Struct std::sync::RwLock

[…]

The priority policy of the lock is dependent on the underlying operating system’s implementation, and this type does not guarantee that any particular policy will be used. In particular, a writer which is waiting to acquire the lock in write might or might not block concurrent calls to read.

In some cases, like maybe in the above case with RwLock, platform-specific behavior of the standard library may be justified (e.g. due to performance reasons), as long as these issues or undefined aspects are documented.

In case of TCP connections, I would prefer if basic operations such as opening and closing (or shutdown-ing) sockets would ideally show the same behavior on all operating systems […]

My point is: When it comes to networking, I feel like a Rust crate (or the standard library) should ideally exhibit a uniform cross-platform behavior unless there are good reasons against it (and this should be well documented).

I think it is okay to address library developers when there are issues, instead of trying to make all kernel developers of all (POSIX) operating systems provide same behavior (which is a nice goal but might not happen in the near future).

How much overhead is that magic compatibility logic going to impose?

Rust's philosophy towards networking/etc primitives has been to try to provide a minimal wrapper over raw OS primitives that systems developers are experienced with already and can refer directly to e.g. man pages for the equivalent POSIX call. A TcpStream object is currently nothing but the file descriptor. Calling read() on a stream directly forwards to read(2).

Unfortunately it looks like the internals of this library aren't threadsafe and so it can't be used in this way. I haven't tried it but I'd guess it just doesn't solve your problem.

shutdown() also isn't the solution to your problem because that's about shutting down one or both directions of a connected TCP stream, and doesn't seem to have a defined meaning when used on a listening socket rather than a connected one. The only POSIX socket API level solution to your problem that I know of is close() but closing file descriptors that are in use by other threads is a fundamentally risky operation that requires you to be extremely careful to avoid race conditions, which isn't really compatible with the Rust standard library's (sensible) API design choices.

Edit: the other "usual" way in POSIX APIs to break a thread out of a loop like this would be to set some kind of stop flag that's checked in the loop and then send the thread a non-fatal signal to make the IO operation fail with EINTR, which might work here but will need libc or similar to call pthread_kill

3 Likes

The only POSIX socket API level solution to your problem that I know of is close() but closing file descriptors that are in use by other threads is a fundamentally risky operation that requires you to be extremely careful to avoid race conditions, which isn't really compatible with the Rust standard library's (sensible) API design choices.

I don't think that closing a socket connection from another thread would add a new "risk". After all, a socket connection can be closed "unexpectedly" at any time anyways. For example, because the network breaks away, or the peer aborts the connection arbitrarily (without graceful shutdown). So, any network code written under the assumption that a socket connection will never be closed "unexpectedly", i.e. before all data has been received and/or sent, is prone to failure in the real world, no matter what...

The real problem is that we are left with an implementation of TcpListener that makes it impossible to write an application that uses TcpListener to accept incoming connections and that will shut down "cleanly" at some point. Telling people that they should simply keep the "listener" thread running forever and terminate the application without a "graceful" shutdown seems like a poor solution. Especially for a language that, otherwise, puts so much effort into ensuring "correct" and "safe" runtime behavior :thinking:

(IMO, the availability of alternative I/O libraries is not really an excuse either)

(Yes, we still could use TcpListener in non-blocking mode, but it requires "spinning", which is a no-go)


I found the following function in winapi that may help, at least on Windows platform:

But how can I pass an std::net::TcpListener or an socket2::Socket to that function?

The function requires a SOCKET as parameter, which essentially is an uint_ptr value.

socket2::Socket has method as_raw() for this purpose, but it is private and thus inaccessible :face_with_raised_eyebrow:

The underlying TCP connection closing and the file descriptor being closed are not the same thing. The connection can be "closed" from the perspective of the TCP protocol without the file descriptor for the socket being closed.

The problem isn't closing it. The problem is that the file descriptor (integer) maybe reused, e.g. by a different crate. The consequences could be disastrous, depending on the assumption of another crate.

2 Likes

Yeah, the function that we need should allow closing the TCP connection – or, in case of a "listening" socket, cause the socket to stop listening and thus abort any pending accept() calls – not necessarily close the underlying file descriptor. Either that, or at least allow accept() to set a timeout value.

(There already is read_timeout() and write_timeout() for TcpStream, so the absence of accept_timeout() for TcpListener() seems inconsistent. Java's ServerSocket has this for years)

PRs welcomed.

The fact that no one has needed this API enough to implement it in the last decade of Rust's development may indicate something about how there are more robust ways that have already been discussed to handle this kind of shutdown operation.

3 Likes

This isn't possible with the actual POSIX socket API. You have the relationship backwards here: there is no "underlying file descriptor". The socket is the underlying object: the socket is a kernel object that you can only access via the syscalls that the kernel provides. A file descriptor is a userland reference to a particular kernel object, and any number of file descriptors (in any number of processes) can reference the same socket. The socket is reference counted by the kernel: it is closed once all file descriptors that refer to it are closed. There is no syscall to close a socket; only a file descriptor can be closed.

The reason read_timeout() and write_timeout() exist but accept_timeout() does not is because POSIX has socket options to set the read/write timeout, and then the timeout itself is implemented by the OS when doing a blocking operation; there is no POSIX-standard socket option for an accept timeout. It looks like Linux does treat SO_RCVTIMEO as applying to accept, but other systems may not as the spec doesn't make this clear one way or the other. If this does actually work across popular OSes then maybe an implementation of accept_timeout could be added, though?

1 Like

After all, Java (JVM) has implemented setSoTimeout() on all platforms.

Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a call to accept() for this ServerSocket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the ServerSocket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be > 0. A timeout of zero is interpreted as an infinite timeout.

And this must be possible on top of the "native" API. I'm not exactly sure how they did it, but I found this comment in the OpenJDK source codes for socketNativeSetOption:

/*
* SO_TIMEOUT is the socket option used to specify the timeout
* for ServerSocket.accept and Socket.getInputStream().read.
* It does not typically map to a native level socket option.
* For Windows we special-case this and use the SOL_SOCKET/SO_RCVTIMEO
* socket option to specify a receive timeout on the socket.
*/

"On top of" does not mean "using", and the comment even explicitly states

It does not typically map to a native level socket option

And indeed in the corresponding unix implementation, there is

    /*
     * SO_TIMEOUT is a NOOP on Solaris/Linux
     */
    if (cmd == java_net_SocketOptions_SO_TIMEOUT) {
        return;
    }

I.e. any timeout functionality on unix lives in the higher-level ServerSocket (apparently using select, at least once upon a time), not in the OS primitives.

So I think the question circles back to

So I think the question circles back to

How much overhead is that magic compatibility logic going to impose?

Okay. But as long as accept() isn't affected, some overhead in accept_timeout() should be fine.

I mean, what is the alternative?

Currently, the only alternative I see is to "emulate" accept_timeout() with mio::TcpListener, because mio::Poll:poll() does have an optional timeout. But this also has some overhead! And, having to deal with the mio events is cumbersome/complex; especially because there won't be another "readyness" event unless the previous operation failed with WouldBlock error – which can be irritating.

For example, suppose that we exit from the write() loop, not because we got a WouldBlock error, but because there simply was no more data to be written (at this time). Then, the next time we want to write some data, on the same mio::TcpStream, waiting for the "writable" event will lead to an annoying deadlock situation, because there won't ever be another "writable" event for this socket – because the previous write() had not failed with WouldBlock error, as is required to rearm the event :exploding_head:

Hence we have to keep track ourselves whether the last time that we exited from the write() loop was because of WouldBlock error or because of "no more data to write":

  • If, previously, we exited from the write() loop because of "no more data to write", then, the next time, we must not wait for a "writable" event, but instead just go ahead.

  • Otherwise, if, previously, we exited from the write() loop because of a WouldBlock error, then, the next time, waiting for the pending "writable" event is required...

That's clearly what you should have concluded, the next logical move is to use tokio ^^

Not necessarily, you can just start a new write loop. Your first loop could also start and the current state of your machine is that the socket is currently not writable. So, you actually have the same precondition about your very first write loop and this does not make any problem.