Checking for incoming data on TcpStream

Hi,

I'm not sure if I have missed something obvious but I wonder how I can check if there is any incoming data on a TcpStream? If i just do stream.read(...) I will block there. I could of course set a lower time-out but I'm not really sure that is the best idea.

The way I do this in C is like this ProDBG/remote_connection.c at main · emoon/ProDBG · GitHub

Which I poll the socket to check if there is any data to read, first after that I do a call to recv like this: ProDBG/remote_connection.c at main · emoon/ProDBG · GitHub

Edit: Another question also. How do I know if the other side has disconnected? When I do a stream.read(...) I get Ok(0) back when the other side has been closed.

Or do I have to use MIO to achieve what I want?

1 Like

Got this reply from vldm on the rust-lang gitter:

"rust std lib has only blocking IO, you can use mio for non-blocking IO. selectapi works on C preprocessor, so you can use C glue to process socket as RawFd. Or if you need only linux support, you can use unsafe code and poll epoll api. Another way is to spawn task for each socket."

If you expect a small number of clients, I'd go the easy "one thread per connection" route. Historically, with lightweight tasks, this scaled well to large numbers of clients as well, so there was no need to have a select-type API. (We do have a select for channels, which makes sense since all these tasks may want to communicate with a "master" thread.)

C select() should be easy to wrap though, I wonder why there is no crate for that yet (at least I could find none).

Yeah I will actually have small number of clients so that should be fine. Would it be best to use channels to implement this or is there some other better way?

I hacked this thing up

use std::thread;
use std::sync::mpsc::channel;
use std::time::Duration;
use std::net::TcpStream;
use std::{slice, str};
use std::io::{Write, Read};

fn main() {
    let (main_tx, main_rx) = channel::<Vec<u8>>();
    let (thread_tx, thread_rx) = channel::<Vec<u8>>();

    thread::spawn(move || {
        let mut stream = TcpStream::connect("127.0.0.1:6860").unwrap();

        loop {
            match main_rx.try_recv() {
                Ok(data) => {
                    let mut buffer = [0; 1024];
                    let _ = stream.write(&data.into_boxed_slice());
                    let _ = stream.read(&mut buffer);
                    let mut t = Vec::new();
                    for i in buffer.iter() { t.push(*i); }
                    let _ = thread_tx.send(t);
                }

                _ => (),
            }

            thread::sleep(Duration::from_millis(1)); // don't eat all CPU!
        }
    });

    loop {
        let mut send_data = Vec::new();
        // send some test data
        send_data.push(b'$');
        send_data.push(b'g');
        send_data.push(b'#');
        send_data.push(b'6');
        send_data.push(b'7');

        let _ = main_tx.send(send_data);

        match thread_rx.try_recv() {
            Ok(data) => {
                let s = unsafe {
                    let slice = slice::from_raw_parts(data.as_ptr(), data.len());
                    str::from_utf8(slice)
                };

                println!("Data back! {}", s.unwrap());
            }

            _ => (),
        }

        println!("updating...");

        thread::sleep(Duration::from_millis(100));
    }
}

I'm sure there are better and cleaner ways to do this. I also tried to use [u8; 1024] for the channels but couldn't get it t work as I would like to remove the allocations that will happen now (and the data is so small so copy is fine here)

You can just use the incoming Vec and send it back, which will avoid allocation if its capacity is large enough.

Why are you doing unsafe shenanigans on the other side? The slice is just data.as_slice() (or &data in method calls that autoderef). (But you might also use String::from_utf8 to do an in-place conversion to String.)

Good point. Will do. Does the code otherwise look like something that should work? (except all the removed error handling that is)

I took the code from here str - Rust

It will usually work, although since TCP is a stream you're not guaranteed to get a full message in one read. That's why most protocols either are line-terminated, or have a length-prefix, so that the receiving end can keep reading until the full message is delivered.

As far as I can see, that example is just to show how a &str looks under the hood. I'll open an issue for somebody to mark it more clearly.

Yeah. I'm doing this for the gdb serial protocol and while not being line-terminated it has a checksum at the end of the data so I can make sure to go on reading until that has been found (or there has been some other time-out)

Yeah that makes sense when I actually read a bit closer. Thanks for pointing that out.

In general, if it looks like you need unsafe for such a basic task, look for a better way first :slight_smile:

Yeah. In this case I just wanted to get something to work so I took first thing I found but I didn't intend to really leave it in there :slight_smile:

1 Like

Why are you polling on the channel instead of just blocking until you receive something?

    loop {
        match main_rx.recv() {
            Ok(data) => {
                let mut buffer = [0; 1024];
                let _ = stream.write(&data.into_boxed_slice());
                let _ = stream.read(&mut buffer);
                let mut t = Vec::new();
                for i in buffer.iter() { t.push(*i); }
                let _ = thread_tx.send(t);
            }

            Err(_) => return, // This means, that the sender has disconnected
                              // and no further messages can ever be received
        }
    }

That way you also don't have to use thread::sleep.

Reading/writing from/to the TcpStream is also very hackish. Both read and write are not guaranteed to read/write the whole message. And you should use some error handling. I'd at least use unwrap, even in such a quick and dirty solution.

Yeah. I have already changed this.

How would you suggest that one write and to a tcp socket and then read to get a result back?

Yes, I already stated above that I didn't do any for a quick hack. My current code (that I will use) does proper error checking on all the functions that returns a result/option.

For writing, there is write_all. For reading, as we discussed above.

You either have to know the size of the message upfront or else read until some delimiter.
In the case of GDB remote serial protocol you need both, the message is terminated by the # character followed by the fixed-size checksum. That means you have to read repeatedly until you encounter the # delimiter and for the checksum you know the size upfront.

Take a look at BufRead::read_until and Read::read_exact

Thanks! Yeah the plan was always to make the reading a bit more robust. The initial version was just to try it out.

Another tip: use take(n) to limit the maximum message size for read_until, to avoid potentially unbounded allocation.

1 Like

If you're on a Unix environment, you can extract the underlying RawFd and peek with some help from libc like so...

let stream_fd = tcp_stream.as_raw_fd();

let mut num_available: libc::c_int = 0;
unsafe {
    libc::ioctl(fd, libc::FIONREAD, &mut num_available);
}

if num_available > 0 {
    // Read all the things
}

To know if the connection is over, you will receive a zero, EOF, as you would with C and Berkley sockets.

Thanks but I need this to work on Windows also.

Ah, bummer. The plumbing is there in std::net::TcpStream (it wraps SOCKET, internally), but extracting the underlying type is only exposed as a Unix extension. If it was exposed on the Windows side, just could just substitute ioctlsocket for ioctl.