[Multi-Threading & TCP] Writing/Reading on TcpStream


#1

So basically I want to write/read to a TcpStream, but am struggling with a few things.
First TcpStream::read doesn’t seem to be blocking, so I did a simple sleep but that’s a bit of a half-assed work, so I’d like to know if anyone can help me with that. Second of all, if I try to send a number bigger than 248 it doesn’t send it all, just sends 64.

Code:

use std::io::prelude::*;
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::sync::{Arc, RwLock};
use std::io::prelude::*;
use std::time::Duration;

static IP: &'static str = "127.0.0.1:34254";

fn alertMessage() {
    println!("Bound to IP: {}", IP);
}

fn read(stream: &mut TcpStream) {
        let mut buf = Vec::new(); 
        println!("Received {} bytes", stream.read_to_end(&mut buf).unwrap());
        println!("[{:?}] Receiving: {:?}", thread::current().name().unwrap(), buf);
}

fn main() {
    let listener = TcpListener::bind(IP).unwrap();
    alertMessage();

    let lcln = listener.try_clone().unwrap();
    let handle = thread::spawn(move || loop {
        match lcln.accept() {
            Ok((mut _socket, addr)) => {
                println!("new client: {:?}", addr);
                
                thread::Builder::new().name("Reader".to_string()).spawn(move || loop {
                    read(&mut _socket);
                    thread::sleep(Duration::from_secs(5));
                });
            },
            Err(e) => println!("couldn't get client: {:?}", e),
        }
    });

    if let Ok(mut stream) = TcpStream::connect("127.0.0.1:34254") {
        let mut s = 248;
        thread::Builder::new().name("Sender".to_string()).spawn(move || 
            for i in (0..10) {
                s+=1;
                println!("[{:?}] Sending: {}", thread::current().name().unwrap(), s);
                stream.write_all(&[200000]);
        });
    }
    handle.join();
}

Outputs:

Bound to IP: 127.0.0.1:34254
new client: V4(127.0.0.1:58428)
["Sender"] Sending: 249
["Sender"] Sending: 250
["Sender"] Sending: 251
["Sender"] Sending: 252
["Sender"] Sending: 253
["Sender"] Sending: 254
["Sender"] Sending: 255
["Sender"] Sending: 256
["Sender"] Sending: 257
["Sender"] Sending: 258
Received 10 bytes
["Reader"] Receiving: [64, 64, 64, 64, 64, 64, 64, 64, 64, 64]

And then every 5 seconds:

Received 0 bytes
["Reader"] Receiving: []

If I remove the sleep then the output becomes full off those. Read is not blocking at all.

Thanks for the your time reading this!


#2

read fills an existing buffer. You’re passing it one that has space for zero elements. If you change let mut buf = Vec::new() to let mut buf = [0; 1024] for example you’ll see data being read, and the stream will block until there is data available.

write doesn’t write all of the data you provide, just whatever amount the kernel decides to read. For example, if the kernel buffer only has 100 bytes of space, it will only read 100 bytes even if your input is larger. The write_all method will loop over write until the entire input has been written.


#3

Hi, thanks for the answer!
Just changed Vec to slice and it worked weird. Now the output is the buffer full of 0’s. And it still isn’t blocking, using write_all and read_to_end.
Code:

fn read(stream: &mut TcpStream) {
        let mut buf = [0; 1024]; 
        let mut v_buf = buf.to_vec();
        println!("Received {} bytes", stream.read_to_end(&mut v_buf).unwrap());
        println!("[{:?}] Receiving: {:?}", thread::current().name().unwrap(), v_buf);
}

Any ideas?
Edit:
Had to buf.to_vec() because read_to_end requires a Vec buffer and not slice.
Edit2
Fixed the buffer, I forgot to actually use the s value so it was sending a big value everytime. Though read_to_end still does not block. :confused:


#5

No, apparently it should write all the bytes.
Link to docs: https://doc.rust-lang.org/std/io/trait.Write.html#method.write_all


#6

What do you mean by not blocking? read_to_end reads until the stream returns an EOF, which for TcpStream means the other end closed the connection. read_to_end will append to the vector, so you would want to start with an empty Vec in that case.


#7

Well, I come from Java (what I learned and learn on University / College). And java sockets read blocks until some data arrives. That is not happening here (it shows empty reads) and is what I am not understanding and want to try to solve.
What I wanted to do was send some messages and receive them, am struggling with that though.
I’m trying to send strings now, but read_to_string is not blocking either, I must be missing something…


#8

Your Sender thread is sending 10 bytes, one every iteration of the loop. Your Reader thread is seeing all of those come in - your read_to_end call returns a buffer of 10 bytes. Once the Sender thread finishes its loop, it it terminates and the socket is closed. read_to_end returning 0 is an indication that the connection is closed.


#9

The difference from Java is that there you’re probably not closing the sending TCP stream, which happens automatically in Rust.


#10

No, I am not. Do I need to have a different TcpStream for every message?
e.g. Open - Close for every message.


#11

TCP is stream-oriented - applications need to define their own framing protocols which allow endpoints to determine when an individual message begins and ends.


#12

Alright thanks, that doesn’t seem easy so maybe I’ll give up on this.


#13

It sounds complicated but it doesn’t have to be.
You could use a text/line based protocol, i.e. one line is one message.
Then use BufRead::read_line to receive a single message.


#14

The framing is no different from Java - you’d need to do the same thing there as well.

As for the calls being blocking, they are :slight_smile:. If you add a sleep in the sender before it exits (and drops, which causes the EOF to be sent), you’ll see your read_to_end call blocked until that happens.


#15

I don’t know, in Java things are so… Easy? I mean I don’t have to mess with bytes and stuff, but that’s kinda of why I want to mess with Rust, since i’m gonna have a Networks project soon and might want to use Rust to send messages.
Well, in Java as I remember I would call read on the receiving socket and send on the sending socket or something like that, was really easy. read blocking until it received a message. That’s the kind of behaviour I want to have.


#16

I see. I need to try that! Thanks!


#17

If you’re working with raw Socket/Channel in java, you’re getting bytes back - it’s no different. Maybe you’re using some framework in Java that was higher level and did the framing (messaging) for you. But at the lowest level, it’s bytes all the way down :slight_smile:.

To reiterate, the Rust API is blocking if you don’t explicitly make it non-blocking, which is just like java.


#18

Well, I was using ObjectOutputStream and ObjectInputStream (to send objects) in Java.
And mine is not blocking but its probably because of the protocol not being defined.


#19

Ok, that’s a bit higher level than raw socket/stream :slight_smile:. These streams do the framing for you, essentially. I suppose the moral equivalent would be to use serde to read/write Rust structs. Or, as @troplin mentioned, using a line based protocol is an even simpler framing approach.


#20

I see. Though how do I use BufRead if TcpStream doesn’t implement it?


#21

BufRead is a trait (think interface in Java), so you’d use something that implements that; https://doc.rust-lang.org/std/io/struct.BufReader.html is what you’re after. BufReader can wrap an underlying Read impl, so you can wrap the TcpStream in a BufReader via BufReader::new(your_stream).