I'm trying to write a gaming server for my godot game with rust and tokio but I can't even seem to be able to estabilish a connection. What am I doing wrong? I tried even aske AI but it struggles to find the problem aswell.
Server (main.rs):
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
async fn handle_client(mut stream: TcpStream) {
// Send "HELLO" with length prefix
let hello_msg = b"HELLO";
send_message(&mut stream, hello_msg).await;
let mut buffer = vec![0; 4]; // Buffer for the 4-byte length header
loop {
// Read the 4-byte length prefix
if let Err(e) = stream.read_exact(&mut buffer).await {
eprintln!("Read error: {}", e);
break;
}
let msg_len = u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]) as usize;
let mut msg_buffer = vec![0; msg_len];
// Read the full message
if let Err(e) = stream.read_exact(&mut msg_buffer).await {
eprintln!("Failed to read message: {}", e);
break;
}
// Process the message (e.g., handshake)
let msg = String::from_utf8_lossy(&msg_buffer);
if msg.contains("handshake") {
// Send acknowledgment with length prefix
let ack_msg = r#"{"type": "handshake_ack"}"#;
send_message(&mut stream, ack_msg.as_bytes()).await;
}
}
}
async fn send_message(stream: &mut TcpStream, data: &[u8]) {
let len_header = (data.len() as u32).to_le_bytes();
if let Err(e) = stream.write_all(&len_header).await {
eprintln!("Failed to send header: {}", e);
return;
}
if let Err(e) = stream.write_all(data).await {
eprintln!("Failed to send data: {}", e);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:1138").await?;
println!("Server listening on 127.0.0.1:1138");
loop {
let (stream, _) = listener.accept().await?;
tokio::spawn(handle_client(stream));
}
}
Client (main game scene):
func _ready() -> void:
network_manager.connected.connect(_on_network_connected)
network_manager.disconnected.connect(_on_network_disconnected)
network_manager.data_received.connect(_on_network_data_received)
network_manager.connect_to_server("127.0.0.1", 1138)
func _on_network_connected():
print("Network connection verified")
# Send initial handshake
network_manager.send_data({
"type": "handshake",
"version": "1.0",
"player_id": "local_player"
})
func _on_network_disconnected(reason: String):
print("Disconnected: ", reason)
# Attempt reconnect after delay
await get_tree().create_timer(2.0).timeout
network_manager.connect_to_server("127.0.0.1", 1138)
func _on_network_data_received(data):
print("Received: ", data)
if data is String and data.begins_with("HELLO"):
print("Server greeting received")
# Send handshake after receiving "HELLO"
network_manager.send_data({
"type": "handshake",
"version": "1.0",
"player_id": "local_player"
})
elif data is String and data.contains("handshake_ack"):
print("Handshake acknowledged by server")
And in network_manager.gd:
extends Node
class_name NetworkManager
static var instance: NetworkManager
signal connected()
signal disconnected(reason: String)
signal data_received(data)
const CONNECT_TIMEOUT := 5.0
const READ_TIMEOUT := 2.0
var _tcp := StreamPeerTCP.new()
var _connected := false
var _host := "127.0.0.1"
var _port := 1138
var _connect_timer := Timer.new()
func _ready():
instance = self
add_child(_connect_timer)
_connect_timer.timeout.connect(_on_connect_timeout)
func connect_to_server(host: String, port: int) -> void:
_host = host
_port = port
if _connected:
print("Reconnecting")
print("Connecting to %s:%d..." % [_host, _port])
_tcp = StreamPeerTCP.new()
var err := _tcp.connect_to_host(_host, _port)
if err != OK:
disconnected.emit("Connection failed: %s" % error_string(err))
else:
_connect_timer.start(CONNECT_TIMEOUT)
_poll_connection()
func _poll_connection():
var status := _tcp.get_status()
match status:
StreamPeerTCP.STATUS_CONNECTED:
_connect_timer.stop()
if not _connected:
_connected = true
print("Connection established!")
connected.emit()
_poll_data()
StreamPeerTCP.STATUS_ERROR:
_handle_disconnection("Connection error")
StreamPeerTCP.STATUS_CONNECTING:
await get_tree().create_timer(0.1).timeout
_poll_connection()
_:
_handle_disconnection("Unexpected status")
func _poll_data():
while _connected:
var available_bytes = _tcp.get_available_bytes()
if available_bytes >= 4: # At least 4 bytes for the length header
# Read the 4-byte length header
var header_bytes = _tcp.get_data(4)
if header_bytes[0] != OK:
_handle_disconnection("Header read error")
return
var len_header: PackedByteArray = header_bytes[1]
var msg_len = (len_header.decode_u32(0)) # Decode little-endian u32
# Read the full message
if _tcp.get_available_bytes() >= msg_len:
var msg_bytes = _tcp.get_data(msg_len)
if msg_bytes[0] != OK:
_handle_disconnection("Message read error")
return
var data = msg_bytes[1].get_string_from_utf8()
if data != null:
data_received.emit(data)
else:
_handle_disconnection("Invalid UTF-8 data")
else:
await get_tree().create_timer(0.1).timeout
func _on_connect_timeout():
if not _connected:
_handle_disconnection("Connection timeout")
func _handle_disconnection(reason: String):
if _connected or _connect_timer.time_left > 0:
_connect_timer.stop()
_connected = false
_tcp.disconnect_from_host()
disconnected.emit(reason)
func send_data(data) -> void:
if _connected:
_tcp.put_var(data)
func is_connected_now() -> bool:
return _connected and _tcp.get_status() == StreamPeerTCP.STATUS_CONNECTED
The logs from server:
Server listening on 127.0.0.1:1138
Read error: An existing connection was forcibly closed by the remote host
The logs from client:
Connecting to 127.0.0.1:1138...
Disconnected: Connection timeout