How to ensure all TcpListeners accept before connecting to their associated streams

I'm working on implementing a simple leader election algorithm using the turmoil crate. Each client in the simulation sim should handle listening for messages from other clients as well as sending messages to its "neighbor", which is a client listening on the next port over.

The simulation logic is here:

    for i in 40000..n {
	    println!("Creating client {}", i);
        sim.client(i.to_string(), async move {
            let s = Arc::new(Mutex::new(State::new()));
            let trans_s = Arc::clone(&s);
            let msgs_s = Arc::clone(&s);

            task::spawn(async move {
		        let listener = net::TcpListener::bind((ADDR, i))
                    .await
                    .expect(&format!("Failed to bind to {}", ADDR));

		        println!("Client {} connected to {} on port {}", i, ADDR, i);

                loop {
                    let (stream, _) = listener.accept().await.unwrap();
                    let length_delimited = FramedRead::new(stream, LengthDelimitedCodec::new());
                    let mut deserialized = tokio_serde::SymmetricallyFramed::new(
                        length_delimited,
                        SymmetricalMessagePack::default(),
                    );

                    if let Some(message) = deserialized.try_next().await.unwrap() {
                        let mut locked = trans_s.lock().await;

                        locked.trans(message);

                        if locked.status == Status::Leader {
                            println!("State {} with id {} has been elected leader", i, locked.u.0);
                        }
                    }
                }
            });

            task::spawn(async move {
		        loop {
		            println!("Client {} is sending message", i);
                    let locked = msgs_s.lock().await;
                    locked.msgs((i + 1) % n).await;
		        }
            });

            Ok(())
        });
    }

I am creating 10 clients that listen on ports 40000-40009.
The first async block listens for connections, while the second block repeatedly sends messages by calling the msgs function defined here:

    async fn msgs(&self, port: u16) -> () {
        let stream = net::TcpStream::connect((ADDR, port)).await.expect("Failed to connect");

	    println!("Sending message to {} on port {}", ADDR, port);
        if let Some(id) = self.send.clone() {
            let length_delimited = FramedWrite::new(stream, LengthDelimitedCodec::new());
            let mut serialized = tokio_serde::SymmetricallyFramed::new(
                length_delimited,
                SymmetricalMessagePack::default(),
            );
            serialized
                .send(rmp_serde::encode::to_vec(&id).expect("Failed to serialize"))
                .await
                .unwrap();
            }
        }

The problem here is that msgs function is attempting to connect to some port i + 1, but the (i + 1)th client which listens on that port has not actually started listening yet, so the connection is refused since the port is closed. I would like for every client to wait until every listener is in fact listening on a port, but am unsure how to do this. I was thinking of using some shared data structure or channel that the listener async block could write to, and have the sender async block until all listeners have written to it.

Would this be a valid strategy and if so what would be the best data structure to use? Also maybe there are constructs that could simplify the design?

I should note the net module is not from the std, but turmoil.

Barrier after binds and before sends for a global wait.
Notify if you want to link individual binds to senders.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.