Hi all,
I am new to Rust and coming from a C# background have essentially no experience with low level languages.. anyway I decided to jump in at the deep end for learning purposes so excuse what is most likely horrible code/practise.
I'm trying to write a server and client that - at least for now - can read and write to each other without the connection closing, somewhat like a ping-pong I suppose. I can see there has been various posts here on the forums about implementations in the past and checked out some of the documentation and tutorials out there which have somewhat helped but not strictly what I was looking for. In all the cases I've seen, the client sends a message and disconnects and that's it.
I found while building up the code that the client can connect to the server and transmit and the server can receive it, the problem is when I tried to add the "writing" from server to client (both things happening simultaneously). In this particular instance of my code (samples below), I'm getting an "early eof" error being thrown from a BufReader when trying to read_exact. Note: I just picked an arbitrary size for what is actually transmitted over the wire, in this case 1024 though I did see BufReader mention something about 8KB so wasn't sure if that could be part of the problem.
I have tried to spawn a task for each of the read/write portions after splitting, thinking that writing might be blocking the reading and causing an empty buffer and hence the EOF error but I am not convinced and probably did it wrong anyway , though it made sense to me that to run concurrently they would need to reside on different threads.
Hoping that you guys can see where I may be going wrong and provide some pointers - I don't usually like to give up and ask for help but as I've spent in total around 24 hours without making any progress so a little frustrated at this point.
Code samples below
Listener code
async fn process_read(mut socket: OwnedReadHalf) {
println!("Reading stream");
let mut buf_reader = BufReader::new(socket);
let mut buf = vec![0u8; 1024];
let mut interval = time::interval(Duration::from_millis(1000));
loop {
//match buf_reader.read_line(&mut test).await {
match buf_reader.read_exact(&mut buf).await {
Ok(n) => {
if n == 0 {
println!("EOF received");
break;
}
let buf_string = String::from_utf8_lossy(&buf);
println!(
"Received message: {}",
buf_string
);
},
Err(e) => {
println!("Error receiving message: {}", e);
}
}
interval.tick().await;
}
}
async fn process_write(mut socket: OwnedWriteHalf) {
println!("Writing stream");
let mut payload = "This is a test string from server".as_bytes();
let mut write_buffer = [0u8; 1024];
// i'm sure this isn't required at all, but read online something about the last byte of the buffer
//being a 0 will indicate an EOF so was trying to work around that
for (i, byte) in payload.iter().enumerate() {
write_buffer[i] = payload[i];
}
write_buffer[1023] = payload.len() as u8;
socket.write_all(&mut write_buffer).await.expect("Write issue occurred");
socket.flush().await.expect("Flush issue when writing");
}
#[tokio::main]
async fn main() -> io::Result<()> {
const ADDR: &str = "127.0.0.1:2222";
let listener = TcpListener::bind(&ADDR).await?;
println!("Starting on: {:#?}", ADDR);
loop {
let (socket, _) = listener.accept().await?;
println!("Client connected: {:#?}", socket.local_addr());
let (read, write) = socket.into_split();
let mut interval = time::interval(Duration::from_millis(1000));
tokio::spawn(async move {
process_read(read).await;
});
tokio::spawn(async move {
process_write(write).await;
});
}
}
Client code
#[tokio::main]
async fn main() {
println!("Starting client...");
let mut write_buffer = [0u8; 1024];
let mut payload = "This is a test string".as_bytes();
let mut read_buffer = [0u8; 1024];
let forever = task::spawn(async move {
let mut interval = time::interval(Duration::from_millis(1000));
// tried to create a full buffer here where the last byte is not 0 like in the server code
//
// for (i, byte) in payload.iter().enumerate() {
// write_buffer[i] = payload[i];
// }
// write_buffer[1023] = payload.len() as u8;
loop {
let stream = TcpStream::connect("127.0.0.1:2222").await.unwrap();
let (mut read, mut write) = stream.into_split();
tokio::spawn(async move {
// tried using &write_buffer as param
write.write_all(&payload).await.expect("Error writing to buffer");
write.flush().await.expect("Flush issue");
});
tokio::spawn(async move {
read.read_exact(&mut read_buffer).await.expect("Error reading from");
println!("{:?}", read_buffer);
});
interval.tick().await;
}
});
forever.await.expect("Forever loop issue");
}