"Connection refused" in test, but the port is open?

/*!
	A way to transport data over TCP.
*/

// External stuff:
use tokio::{
	io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter, Error},
	net::{
		tcp::{ OwnedReadHalf, OwnedWriteHalf, }, TcpListener, TcpSocket
	},
	sync::mpsc::{
			self, Receiver, Sender
	},
	task,
};
use std::{net::Ipv6Addr, vec};

// Internal stuff:
use super::{Connection, ConnectionConfiguration};
use crate::{
	crypto::{Encryptor, Decryptor},
	kex::KeyExchanger,
};


pub struct TcpConnection {
	listener: Option<TcpListener>,
	config: ConnectionConfiguration,
} impl TcpConnection {

	/// This performs the key exchange, returning the resulting encryptor/decryptor pair, or an error.
	async fn exchange_keys(&self, tx: &mut BufWriter<OwnedWriteHalf>, rx: &mut BufReader<OwnedReadHalf>) -> Result<(impl Encryptor + use<>, impl Decryptor + use<>), Error> {
		
		// First we need to make two key exchange objects:
		let mut i_kex = self.config.kex.generate();
		let mut o_kex = self.config.kex.generate();

		// We'll send `i_kex`'s public key first, then `o_kex`'s:
		tx.write(&i_kex.get_local_pubkey()).await?;
		tx.write(&o_kex.get_local_pubkey()).await?;
		tx.flush().await?;

		// Then, we read out the remote public keys:
		let mut i_pubkey_buf: Vec<u8> = vec![0_u8; i_kex.get_public_key_length()];
		let mut o_pubkey_buf: Vec<u8> = vec![0_u8; o_kex.get_public_key_length()];
		rx.read_exact(&mut o_pubkey_buf).await?;	// The other's output should be my input,
		rx.read_exact(&mut i_pubkey_buf).await?;	// and vis-versa.

		// And set the remote public key on the key exchangers:
		i_kex.set_remote_pubkey(&i_pubkey_buf).map_err(|e| { Error::other(e) })?;
		o_kex.set_remote_pubkey(&o_pubkey_buf).map_err(|e| { Error::other(e) })?;

		// Now we need to actually initiate the key exchange, starting with the client init step:
		tx.write(&o_kex.client_init().map_err(|e| { Error::other(e.to_string()) })?).await?;	// Send a client init.
		tx.flush().await?;
		let mut i_remote_client_init_buf: Vec<u8> = vec![0_u8; i_kex.get_client_init_length()];	// For holding the client's client init.
		rx.read_exact(&mut i_remote_client_init_buf).await?;	// Receive it.

		// Do the server init step:
		tx.write(&i_kex.server_init(&i_remote_client_init_buf).map_err(|e| { Error::other(e.to_string()) })?).await?;	// Send a server init.
		tx.flush().await?;
		let mut o_remote_server_init_buf: Vec<u8> = vec![0_u8; o_kex.get_server_init_length()];	// For holding the client's server init.
		rx.read_exact(&mut o_remote_server_init_buf).await?;

		// Do the client confirm step:
		o_kex.client_confirm(&o_remote_server_init_buf).map_err(|e| { Error::other(e.to_string()) })?;	// Done with key exchange!

		// Make the keys/crypto thingies:
		return Ok(self.config.crypto.generate(i_kex, o_kex));
	}

	/**
		Used to spin up a send task.
		`tx`: socket to send on.
		`ch`: channel to read out of.
		`en`: `Encryptor` to use.
	*/
	async fn send_task<T: Encryptor>(tx: &mut BufWriter<OwnedWriteHalf>, ch: &mut Receiver<Vec<u8>>, en: &mut T) {
		if let Err(e) = loop {
			// Read byte vectors out of the channel, until the other end is dropped:
			if let Some(mut byte_vec) = ch.recv().await {
				if let Err(e) = en.encrypt(&mut byte_vec, b"").map_err(|e| { Error::other(e.to_string()) }) {
					// Attempt to encrypt the data ☝🏻
					break Err(e);
				} else if let Err(e) = tx.write_u64_le(byte_vec.len().try_into().unwrap()).await {
					// Send the length ☝🏻
					break Err(e);
				} else if let Err(e) = tx.write(&byte_vec).await {
					// Send the data ☝🏻
					break Err(e);
				} else if let Err(e) = tx.flush().await {
					// Flush the buffer ☝🏻
					break Err(e);
				}
			} else {
				// Run if the channel is closed:
				break tx.shutdown().await;
			}
		} {
			// This gets run if an error occures in the loop:
			eprintln!("an error occured on a sender task connected to {}: {}", tx.get_ref().peer_addr().unwrap(), e);
		} else {
			// This gets run if all is dandy:
			eprintln!("closed sender task connected to {}", tx.get_ref().peer_addr().unwrap());
		}
		return;
	}

	/**
		Used to spin up a receive task.
		`rx`: socket to receive on.
		`ch`: channel to send to.
		`de`: `Decryptor` to use.
	*/
	async fn recv_task<T: Decryptor>(rx: &mut BufReader<OwnedReadHalf>, ch: &mut Sender<Vec<u8>>, de: &mut T) {
		if let Err(e) = loop {
			// First, read the length of the next message:
			if let Ok(message_length) = rx.read_u64_le().await {
				// Make a buffer to store the message:
				let mut buf: Vec<u8> = vec![0_u8; message_length.try_into().unwrap()];
				// Read it:
				if let Err(e) = rx.read_exact(&mut buf).await {
					// If receiving fails, exit the loop:
					break Err(e);
				} else if let Err(e) = de.decrypt(&mut buf, b"").map_err(|e| { Error::other(e.to_string()) }) {
					// Try to decrypt the message, and if it fails:
					break Err(e);
				} else if let Err(_) = ch.send(buf).await {
					// Try to send the decrypted message down the channel, and if that fails, then the `Receiver` must have been dropped, so the connection should be closed:
					break Ok(());
				}
			} else {
				// Run if there's an error reading the length from the socket:
				break Err(Error::last_os_error());
			}
		} {
			// Run if an error occures in the loop above:
			eprintln!("an error occured on a receiver task connected to {}: {}", rx.get_ref().peer_addr().unwrap(), e);
		} else {
			// If all is well:
			eprintln!("closed receiver task connected to {}", rx.get_ref().peer_addr().unwrap());
		}
		return;
	}

}

impl Connection for TcpConnection {
	type Error = tokio::io::Error;


	fn new(config: ConnectionConfiguration) -> Self {
		return Self {
			listener: None,
			config: config,
		};
	}

	async fn listen(&mut self) -> Result<(), Self::Error> {
		// Bind a socket and set the field with it:
		let sock: TcpSocket = TcpSocket::new_v6()?;	// IPv6.
		sock.set_reuseport(true)?;	// So that all connections can use the same port.
		sock.bind((self.config.addr, self.config.port).into())?;	// Actually bind it.
		self.listener = Some(sock.listen(1)?);
		return Ok(());
	}

	async fn accept(&mut self) -> Result<(Sender<Vec<u8>>, Receiver<Vec<u8>>), Self::Error> {
		
		// Check if we're supposed to be listening:
		if let Some(listener) = &self.listener {

			// If we are, accept one connection:
			let (stream, address) = listener.accept().await?;
			eprintln!("New connection from {}.", &address);

			// And split the stream into a sender and a receiver:
			let (rx_u, tx_u) = stream.into_split();
			let mut tx: BufWriter<OwnedWriteHalf> = BufWriter::new(tx_u);
			let mut rx: BufReader<OwnedReadHalf> = BufReader::new(rx_u);

			// Make the keys/crypto thingies:
			let (mut encryptor, mut decryptor) = self.exchange_keys(&mut tx, &mut rx).await?;

			// Make the channels:
			let (send_sender, mut send_receiver) = mpsc::channel::<Vec<u8>>(Self::CHANNEL_BUFFER_SIZE);
			let (mut recv_sender, recv_receiver) = mpsc::channel::<Vec<u8>>(Self::CHANNEL_BUFFER_SIZE);

			// Spawn the tasks, and add their abort handles to the internal vector:
			task::spawn(async move { Self::send_task(&mut tx, &mut send_receiver, &mut encryptor).await });	// Send task.
			task::spawn(async move { Self::recv_task(&mut rx, &mut recv_sender, &mut decryptor).await });	// Receive task.

			// Return the channels:
			return Ok((send_sender, recv_receiver));
		} else {
			// If this `struct` _shouldn't_ be listening:
			panic!("this `TcpConnection` is not listening!");
		}

	}

	async fn connect(&mut self, addr: Ipv6Addr, port: u16) -> Result<(Sender<Vec<u8>>, Receiver<Vec<u8>>), Self::Error> {
		
		// First, make sure that this isn't supposed to be a server:
		if let None = self.listener {
			// All good? Continue by setting up a socket:
			let sock: TcpSocket = TcpSocket::new_v6()?;	// IPv6.
			sock.set_reuseport(true)?;	// So that multiple connections can use the same port.
			sock.bind((self.config.addr, self.config.port).into())?;	// Use the configured listen address and port to connect to the remote server.

			// Now connect, and split the stream:
			let (rx_u, tx_u) = sock.connect((addr, port).into()).await?.into_split();

			// And buffer them:
			let mut tx: BufWriter<OwnedWriteHalf> = BufWriter::new(tx_u);
			let mut rx: BufReader<OwnedReadHalf> = BufReader::new(rx_u);

			// Make the encryptor and decryptor:
			let (mut encryptor, mut decryptor) = self.exchange_keys(&mut tx, &mut rx).await?;

			// Make the channels:
			let (send_sender, mut send_receiver) = mpsc::channel::<Vec<u8>>(Self::CHANNEL_BUFFER_SIZE);
			let (mut recv_sender, recv_receiver) = mpsc::channel::<Vec<u8>>(Self::CHANNEL_BUFFER_SIZE);

			// Spawn the tasks, adding their abort handles to the vector:
			task::spawn(async move { Self::send_task(&mut tx, &mut send_receiver, &mut encryptor).await });	// Send task.
			task::spawn(async move { Self::recv_task(&mut rx, &mut recv_sender, &mut decryptor).await });	// Receive task.

			return Ok((send_sender, recv_receiver));
		} else {
			// If this `struct` shouldn't be connecting:
			panic!("this `TcpConnection` should not be connecting!");
		}

	}

}


#[tokio::test]
async fn test_tcp_connection() {

	let message: &[u8] = b"The missile knows where it is at all times; it knows this because it knows where it isn't.";

	// Need a configuration first:
	let client_conf: ConnectionConfiguration = ConnectionConfiguration {
		addr: Ipv6Addr::LOCALHOST,
		port: 54320,
		connection: super::Implementation::Tcp,
		crypto: crate::crypto::Implementation::AesGcm,
		kex: crate::kex::Implementation::Kyberlib,
	};
	let server_conf: ConnectionConfiguration = ConnectionConfiguration {
		addr: Ipv6Addr::LOCALHOST,
		port: 54321,
		connection: super::Implementation::Tcp,
		crypto: crate::crypto::Implementation::AesGcm,
		kex: crate::kex::Implementation::Kyberlib,
	};

	// Make a server:
	let mut server: TcpConnection = TcpConnection::new(server_conf);
	
	// Make a client:
	let mut client: TcpConnection = TcpConnection::new(client_conf);

	// Runs the server:
	task::spawn(async move {
		server.listen().await.unwrap();
		let (tx, mut rx) = server.accept().await.unwrap();
		
		// Simple echo server:
		while let Some(data) = rx.recv().await {
			tx.send(data).await.unwrap();
		}
		return;
	});

	// Runs a client:
	let (ctx, mut crx) = client.connect(Ipv6Addr::LOCALHOST, 54321).await.unwrap();
	ctx.send(message.to_vec()).await.unwrap();
	assert_eq!(crx.recv().await.unwrap(), message.to_vec());
}

Running the test here gives the following error:

thread 'connection::qsh_tcp::test_tcp_connection' panicked at src/qshd/connection/qsh_tcp.rs:281:75:
called `Result::unwrap()` on an `Err` value: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
stack backtrace:
   0: __rustc::rust_begin_unwind
   1: core::panicking::panic_fmt
   2: core::result::unwrap_failed
   3: core::result::Result<T,E>::unwrap
             at /usr/lib/rust/1.87.0/lib/rustlib/src/rust/library/core/src/result.rs:1109:23
   4: qshd::connection::qsh_tcp::test_tcp_connection::{{closure}}
             at ./src/qshd/connection/qsh_tcp.rs:281:23
   5: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /usr/lib/rust/1.87.0/lib/rustlib/src/rust/library/core/src/future/future.rs:124:9
   6: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /usr/lib/rust/1.87.0/lib/rustlib/src/rust/library/core/src/future/future.rs:124:9
   7: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/scheduler/current_thread/mod.rs:733:54
   8: tokio::task::coop::with_budget
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/task/coop/mod.rs:167:5
   9: tokio::task::coop::budget
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/task/coop/mod.rs:133:5
  10: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/scheduler/current_thread/mod.rs:733:25
  11: tokio::runtime::scheduler::current_thread::Context::enter
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/scheduler/current_thread/mod.rs:432:19
  12: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/scheduler/current_thread/mod.rs:732:36
  13: tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}}
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/scheduler/current_thread/mod.rs:820:68
  14: tokio::runtime::context::scoped::Scoped<T>::set
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/context/scoped.rs:40:9
  15: tokio::runtime::context::set_scheduler::{{closure}}
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/context.rs:176:26
  16: std::thread::local::LocalKey<T>::try_with
             at /usr/lib/rust/1.87.0/lib/rustlib/src/rust/library/std/src/thread/local.rs:311:12
  17: std::thread::local::LocalKey<T>::with
             at /usr/lib/rust/1.87.0/lib/rustlib/src/rust/library/std/src/thread/local.rs:275:15
  18: tokio::runtime::context::set_scheduler
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/context.rs:176:9
  19: tokio::runtime::scheduler::current_thread::CoreGuard::enter
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/scheduler/current_thread/mod.rs:820:27
  20: tokio::runtime::scheduler::current_thread::CoreGuard::block_on
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/scheduler/current_thread/mod.rs:720:19
  21: tokio::runtime::scheduler::current_thread::CurrentThread::block_on::{{closure}}
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/scheduler/current_thread/mod.rs:200:28
  22: tokio::runtime::context::runtime::enter_runtime
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/context/runtime.rs:65:16
  23: tokio::runtime::scheduler::current_thread::CurrentThread::block_on
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/scheduler/current_thread/mod.rs:188:9
  24: tokio::runtime::runtime::Runtime::block_on_inner
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/runtime.rs:356:47
  25: tokio::runtime::runtime::Runtime::block_on
             at /home/leyvi/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.45.1/src/runtime/runtime.rs:330:13
  26: qshd::connection::qsh_tcp::test_tcp_connection
             at ./src/qshd/connection/qsh_tcp.rs:283:2
  27: qshd::connection::qsh_tcp::test_tcp_connection::{{closure}}
             at ./src/qshd/connection/qsh_tcp.rs:242:31
  28: core::ops::function::FnOnce::call_once
             at /usr/lib/rust/1.87.0/lib/rustlib/src/rust/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

The thing is, my firewall is configured to allow the connection. So what gives?

I think the server task had just been spawned but not started yet. the default tokio::test uses single threaded runtime.

try yielding before running the client code, or you may even need to add a small delay if the server had multiple awaits before starting the tcp listener, in which case a single yield may not be enough.

// run the server
tokio::spawn(...);

// add this
yield_now().await;
// or this
sleep(Duration::from_millis(1)).await;

// run a client
client.connect(...);

You were right, adding a delay did help! Thanks!