Hi folks. I’m struggling a strange problem. I can detect the close event in nodejs for a TCP socket connection but I can’t do same thing in Rust (Tokio). For example:
// Javascript
let connection = new net.Socket();
connection.on("end", (data) => {
console.log("Connection finished.")
})
For example I can connect to this connection with CURL and when connection finishes the end function invokes. But I can’t do same thing in tokio::net::TcpStream
Yes I know that we can’t really understand that is the connection live. All network connections actaully isn’t exist but at least I want to detect the FIN flag sent from the peer. How can I do this?
You’re awesome. split function returns reference with lifetime specifier. This causes problems which difficult to solve. But I used into_split and moved the values to Arc<Mutex<>>s. This is easier then struggling with references and lifetime specifiers when moving values to async functions, tokio::spawns and loops. Let me show the code:
async fn handle_sockets(
socket1: TcpStream,
socket2: TcpStream
) -> anyhow::Result<()> {
// ...
let (socket1_read_half, socket1_write_half) = socket1.into_split();
let (socket2_read_half, socket2_write_half) = socket2.into_split();
let arc_socket1_read_half = Arc::new(Mutex::new(socket1_read_half));
let arc_socket1_write_half = Arc::new(Mutex::new(socket1_write_half));
let arc_socket2_read_half = Arc::new(Mutex::new(socket2_read_half));
let arc_socket2_write_half = Arc::new(Mutex::new(socket2_write_half));
let _ = timeout(
Duration::from_secs(90),
tokio::spawn(async move {
tokio::spawn(transfer_stream(
arc_socket1_read_half,
arc_socket2_write_half,
"s1 to s2"
));
tokio::spawn(transfer_stream(
arc_socket2_read_half,
arc_socket1_write_half,
"s2 to s1"
));
}),
)
.await;
// ...
}
async fn transfer_stream(
read_stream: Arc<Mutex<OwnedReadHalf>>,
write_stream: Arc<Mutex<OwnedWriteHalf>>,
direction: &str,
) -> std::io::Result<()> {
tracing::info!("{direction} starting.");
loop {
let mut buffer = vec![0; 1024 * 256];
let bytes_cnt = read_stream.lock().await.read(&mut buffer).await?;
if bytes_cnt == 0 {
break;
}
buffer.truncate(bytes_cnt);
write_stream.lock().await.write_all(&buffer).await?;
tracing::info!("Bytes transferred to direction {direction}: {bytes_cnt}");
}
Ok(())
}
Have you checked out tokio::io::copy_bidirectional for that exact problem of forwarding contents as-is?
Anyways, you do not want to allocate vec![0; 1024 * 256] upon each loop iteration, because it is interacting with a global heap - it would be faster to move allocation out of the loop.