Issues creating a simple TCP server-client using Tokio

Hi all,

I am new to Rust and coming from a C# background have essentially no experience with low level languages.. anyway I decided to jump in at the deep end for learning purposes so excuse what is most likely horrible code/practise.

I'm trying to write a server and client that - at least for now - can read and write to each other without the connection closing, somewhat like a ping-pong I suppose. I can see there has been various posts here on the forums about implementations in the past and checked out some of the documentation and tutorials out there which have somewhat helped but not strictly what I was looking for. In all the cases I've seen, the client sends a message and disconnects and that's it.

I found while building up the code that the client can connect to the server and transmit and the server can receive it, the problem is when I tried to add the "writing" from server to client (both things happening simultaneously). In this particular instance of my code (samples below), I'm getting an "early eof" error being thrown from a BufReader when trying to read_exact. Note: I just picked an arbitrary size for what is actually transmitted over the wire, in this case 1024 though I did see BufReader mention something about 8KB so wasn't sure if that could be part of the problem.

I have tried to spawn a task for each of the read/write portions after splitting, thinking that writing might be blocking the reading and causing an empty buffer and hence the EOF error but I am not convinced and probably did it wrong anyway :sweat_smile:, though it made sense to me that to run concurrently they would need to reside on different threads.

Hoping that you guys can see where I may be going wrong and provide some pointers - I don't usually like to give up and ask for help but as I've spent in total around 24 hours without making any progress so a little frustrated at this point.

Code samples below

Listener code

async fn process_read(mut socket: OwnedReadHalf) {
    println!("Reading stream");

    let mut buf_reader = BufReader::new(socket);
    let mut buf = vec![0u8; 1024];
    let mut interval = time::interval(Duration::from_millis(1000));

    loop {
        //match buf_reader.read_line(&mut test).await {
            match buf_reader.read_exact(&mut buf).await {
            Ok(n) => {
                if n == 0 {
                    println!("EOF received");
                    break;
                }
                let buf_string = String::from_utf8_lossy(&buf);
                println!(
                    "Received message: {}",
                    buf_string
                );
                
            },
            Err(e) => {
                println!("Error receiving message: {}", e);
            } 
        }   

    interval.tick().await;

    }
 
}

async fn process_write(mut socket: OwnedWriteHalf) {
    println!("Writing stream");
    let mut payload = "This is a test string from server".as_bytes();
    let mut write_buffer = [0u8; 1024];

        // i'm sure this isn't required at all, but read online something about the last byte of the buffer 
       //being a 0 will indicate an EOF so was trying to work around that
        for (i, byte) in payload.iter().enumerate() {
            write_buffer[i] = payload[i];
        }

        write_buffer[1023] = payload.len() as u8;
    socket.write_all(&mut write_buffer).await.expect("Write issue occurred");
    socket.flush().await.expect("Flush issue when writing");
}

#[tokio::main]
async fn main() -> io::Result<()> {
    const ADDR: &str = "127.0.0.1:2222";
    let listener = TcpListener::bind(&ADDR).await?;
    println!("Starting on: {:#?}", ADDR);

    loop {
        let (socket, _) = listener.accept().await?;
        println!("Client connected: {:#?}", socket.local_addr());

        let (read, write) = socket.into_split();
        
        let mut interval = time::interval(Duration::from_millis(1000));

        tokio::spawn(async move {
            process_read(read).await;
        });

        tokio::spawn(async move {
            process_write(write).await;
        });
    }
}

Client code

#[tokio::main]
async fn main() {
    println!("Starting client...");
    
    let mut write_buffer = [0u8; 1024];
    let mut payload = "This is a test string".as_bytes();
    
    let mut read_buffer = [0u8; 1024];
    
    
    let forever = task::spawn(async move {
        let mut interval = time::interval(Duration::from_millis(1000));
        
        // tried to create a full buffer here where the last byte is not 0 like in the server code
        // 
        // for (i, byte) in payload.iter().enumerate() {
        //     write_buffer[i] = payload[i];
        // }
        // write_buffer[1023] = payload.len() as u8;
        
        loop {

                let stream = TcpStream::connect("127.0.0.1:2222").await.unwrap();
                let (mut read, mut write) = stream.into_split();

                tokio::spawn(async move {
                    // tried using &write_buffer as param
                    write.write_all(&payload).await.expect("Error writing to buffer");
                    write.flush().await.expect("Flush issue");

                });

                tokio::spawn(async move {
                    read.read_exact(&mut read_buffer).await.expect("Error reading from");
                    println!("{:?}", read_buffer);

                 });

        
                interval.tick().await;
            }
        });

        forever.await.expect("Forever loop issue");

        
    }

Try reading the doc. If it doesn't do what you want there are other methods.

I read the doc for that method and the other methods too, but unfortunately that doesn't mean I know how to use them correctly ... Anyway I am not sure that is the solution as I believe there might be some task/threading issue with the loop that causes the client to reconnect and the server to re-accept every iteration but I'm not sure, which is why I came to ask the experts in the first place

you can think of read_exact() having an internal loop, where it will NOT return, until the buffer is full, or the stream is closed. this means, if you give it a buffer of size 1024, it will NOT return until exactly 1024 bytes were read. this is typically used to deserialize data with specific wire formats.

usually, you should use read(), which will return some data as soon as they arrived, then you check the return value to know the size.

if you use the right read() for the read operation, then your server loop can accept new clients without problem, but the reading thoughput for individual client is throttled to 1 packet per second. the writing is fine, it send a single packet and then closes the connection.

on the client side, again, assume you use the correct read() call, it looks find. you create a new connection to the server every second, each connection will send a single packet and receive a single packet and then disconnect.

so your problem mainly is you use the wrong read_exact() call without understanding its semantics. the server reading throughput is throttled, I can't say it was intended, or it was of misunderstanding, at most, I would say it was not a good way to do it.

one thing to mention, the client side, you spawn a forever task then immediately await on it, this is unnecessary, you can just loop forever in the main function.

btw, in real application, you typically want to coordiate the reading and writing in the form of some higher level protocols, running both directions of a duplex socket as independent simplex data channels is rarely useful.

thanks for the input - a lot of this is way over my head so explaining like i know nothing is helpful. i do have a few follow questions though..

I figured that in my code using read_exact I was sending exactly 1024 bytes as part of the buffer and then reading that into a 1024 byte buffer so was expecting that to have no issue, but you're right switching to just "read" stopped that "early eof" error from occurring and now have some kind of two-way communication, though as you said it connects, sends a packet, then disconnects..

is there anyway to keep the stream open? in my head i expected that the connection would open and then stay open until you (the developer) decided it was done and could close, but perhaps i am misunderstanding and you would need some other logic that this code would be wrapped up in to maintain state.. my naive approach to this was trying to move the connect/accept/split up out of the loop which only produced compiler errors related to moves and mutations from another iteration such as below:

// stuff above removed for brevity
    let stream = TcpStream::connect("127.0.0.1:2222").await.unwrap();
    let (mut read, mut write) = stream.into_split();
        loop {
            let mut interval = time::interval(Duration::from_millis(1000));

            tokio::spawn(async move {
                // tried using &write_buffer as param
                write.write_all(&payload).await.expect("Error writing to buffer");
                write.flush().await.expect("Flush issue");
            })

// more stuff below

are there any helpful docs you would recommend to understand the below a little better?

i've seen mentions of MPSC channels and an example where the developer created one of these using the read and write stream split, is this the type of thing you mean?

you were only sending 1024 packet from the server to the client, but not the other direction. what the client sent to the server is the message "this is a test string".as_bytes()

when the reader goes out of scope, it is dropped, and the connection is closed. dropping a value means running its "destructor".

each time your client loops, it connects to the server, then the socket is splitted, then you spawn a reader task and writer task. neither the reader task nor the writer task is in a loop, they run straight to the end, at which point the socket is closed.

then the main loop wait a second, and repeat again, creating another connection.

if you want the client only create a single connection, which sends a packet to the server every second, your "writer" task must be in a loop. same for the reader task, it should call read() in a loop, until EOF is reached, meaning the server closed the socket. something like this:

// src/client/main.rs
//...
let stream = TcpStream::connect(...).await.unwrap();
let (mut reader, mut writer) = stream.into_split();
// write to server in a loop, 
let writer_task = tokio_spawn(async move {
  loop {
    writer.write_all(&payload).await.unwrap();
    writer.flush().await.unwrap();
    interval.tick().await;
  }
});
// read from server in a loop, assume error is EOF
let reader_task = tokio::spawn(async move {
  loop {
    if let Ok(len) = reader.read(&mut buffer).await {
      println!("read len: {}, packet: {:?}", len, &buffer[..len]);
    } else {
      println!("read error");
      break;
    }
  }
});
// the writer is in a loop so this task handle will never resvole
writer_task.await();

however, on your server side, only a single packet is sent to the client, the client's reader task will loop only twice, first time to receive the packet, second time will get error and then break the loop and finish.

if you want both directions to send packets indefinitely, the server process_write() needs also be changed to a loop.

note, you should only add delay on the sending side, not on the receiving side.

no, what I meant was, in your code, each direction of data transfer is running completely independently, this is not a practical useful thing to do.

typically, the server first read some data, parse it into a request, and then send back the response to the client.

I think you are getting way ahead of yourself. I suggest you first learn the basics of networking, without the complexity of async and tokio, just use the standard library. rust async can be overwhelming for new learners.

once you are comfortable to work with the standard (non-async) socket APIs, you can get back to async without getting lost in the basics.

thanks for all the help, it makes a lot more sense than it did so i appreciate the time spent explaining.. i'll continue on and see where i can go with it and using it as a resource for learning, though as you say i may need to put it on pause and revisit when my knowledge is more complete

ah - i think in the version of code i posted originally i'd already changed the server from sending the fixed size buffer to the string buffer as in this quote, out of curiosity i will go back and change to the buffer and have my read methods as read_exact and see what happens

i see what you meant now, i pretty much just decided upon two disparate processes working independently until i felt i could write some kind of read -> process -> response mechanism - it seemed like the simplest implementation to get working at the time but is going to be the next thing i try (and most likely fail at :smiley: )