Conditional exit of TcpListener.incoming() iterator

I have a TCP listener and am handling incoming connections. At some point, I want to stop the handling of incoming connections from another thread.

To do this, I followed the advice here which recommends to send a message to the thread that is iterating over .incoming().

I did this, and it works, but I'm concerned that the way I did it isn't optimal. Is it advisable to have a thread spinning like this. Are there any better ways to solve this problem?

Here is the code that implements the solution. Note the listener must be non-blocking for this to work or else the loop will hang on the tcp_iter.next() line. Here, the loop is spinning most of the time checking if either the receiver or the incoming iterator has something for it to process.

fn host_lobby(connections: Arc<Mutex<Vec<Connection>>>, receiver: Receiver<HaltMessage>) {
    //receiver will indicated whether to kill the lobby.
    println!("Started lobby!");

    let address: SocketAddr = "127.0.0.1:12345"
        .to_socket_addrs()
        .unwrap()
        .next()
        .unwrap();

    let listener = TcpListener::bind(address)
        .map_err(|error| eprintln!("{:?}", error))
        .unwrap();

    listener.set_nonblocking(true);

    println!("Listening on {}", listener.local_addr().unwrap());

    let mut thread_handles: Vec<JoinHandle<()>> = Vec::new();

    let mut tcp_iter = listener.incoming();

    loop {
        if let Ok(_) = receiver.try_recv() { //Received a stop
            println!("Listener stopped");
            break;
        }

        if let Some(Ok(stream)) = tcp_iter.next() { //if there is a new connection
            println!("New connection!");
            let connections_clone = Arc::clone(&connections); //moved to thread
            thread_handles.push(
                thread::spawn(move || {
                    handle_connection(stream, connections_clone)
                })
            );
        }
    }

}

Have you considered using asynchronous Rust? In async, your TcpListener and Receiver will yield to other tasks (basically light-weight threads) most of the time, and only wake up when there is an incoming stream or stop signal.

More or less your only options are these:

  1. Spin the thread like now.
  2. Connect to the listener and tell it to exit.
  3. Use async await which directly supports cancellation.

If you don't want to go the async route, a clever way that I've used is to set a flag in my listener to indicate that it should stop listening, and then immediately open a new connection. That way you're not spinning, because you use the new connection to unblock the blocking read.

use std::io;
use std::net::{TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;

fn main() -> io::Result<()> {
    let listener = TcpListener::bind("localhost:0")?;
    let addr = listener.local_addr()?;

    let should_exit = Arc::new(AtomicBool::new(false));
    let listener_should_exit = Arc::clone(&should_exit);

    let handle = thread::spawn(move || {
        for _conn in listener.incoming() {
            println!("got connection");

            if listener_should_exit.load(Ordering::SeqCst) {
                break;
            }

            println!("handling connection...");
        }
    });

    let _ = TcpStream::connect(addr)?;

    thread::sleep_ms(1000);

    should_exit.store(true, Ordering::SeqCst);
    let _ = TcpStream::connect(addr)?;

    handle.join().unwrap();

    println!("exited");

    Ok(())
}
1 Like

Thank you for your response. If I understand the code correctly, the spawned thread will block on the listener.incoming() iterator. This wouldn't work for me because the TCPListener might never receive another connection and therefore the thread will continue blocking regardless of the state of the atomic flag.

Edit: I see what you mean. Sorry. I misunderstood. Making a connection from the server to itself will advance the iterator. That's pretty clever!

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.