Tokio - stop listening for new tcp connections conditionally

Does anyone have a good pattern for terminating tokio's Incoming (from TcpListener::incoming()) stream early? Let me describe a usecase to elaborate.

Server starts up and establishes a TCP listener. The server expects a certain number of client connections - it’s not an infinite stream of them. After accepting a connection, it handshakes with the client - this involves getting some identification information (to make it simple, say the client sends a json message with a string identifier). The server removes this client from the set of clients it’s expecting. If this is the last client it was expecting, the Incoming should be dropped and the server should end up closing the accept socket. It wants to be left with a list of connected clients, which is basically their TcpStream, so it can communicate with them afterwards.

What’s a good way to achieve this? I’ve not found anything in tokio (thus far) that would make this straightforward but maybe I missed something.

One option is something like this:

let incoming = listener.incoming().for_each(|(sock, addr)| ...);
let (shutdown_sender, shutdown_receiver) = mpsc::oneshot();

// give shutdown_sender to whatever monitors to stop listening

let f = incoming.select(shutdown_receiver);
core.run(f);
1 Like

Thanks @sfackler. I think I tried using a channel for this by sending a signal from the chain that does the handshaking but ran into some issue that I don’t recall offhand; will try it again tomorrow though.

Edit: ok I recall the issue - the oneshot Sender has a send method that consumes self which means it can’t be used (easily at least) in any FnMut closure.

What I was looking for, but didn’t find, is something like Stream::take_while() but that would instead inform the stream to complete. In my case, take_while would actually need to wait for another accepted connection after all the expected ones have been received before it can terminate the stream; I want to terminate it at the same time as receiving the last expected connection.

I’ll keep poking around tomorrow but this seems like a fairly simple thing conceptually. I may just end up implementing my own Stream adapter if I can’t find a way to do this (cleanly) with the stock combinators.

In the meantime, happy to hear other suggestions.

It dawned on me that I can probably make this work with loop_fn over the handshake futures - once all expected clients are connected, a Loop::Break will complete the root future and the listener will be dropped. Not sure if this will actually work or is the best/most ergonomic approach but seems like it might work.

I've (ab)used unbounded mpsc channels for such cases.

1 Like

You could implement another struct like Collect which takes the Incoming stream and you manage your array of expected sockets in the poll method to add a condition of termination.

1 Like

Yup, thanks @ndusart.