How to chain buffers in mio


#1

I’m trying to write a simple application with mio. It involves receiving a stream of length prefixed protobuf messages. I am having trouble with chunking TcpStream.

My current approach is to copy incoming buffers byte-by-byte to a Vec until I have a whole message:

    pub fn feed(&mut self, buf: ByteBuf) -> Vec<Vec<u8>> {
        let mut result = Vec::new();
        buf.remaining();
        for &byte in buf.bytes() {
            if self.is_reading_length {

                self.len_buffer.push(byte);

                if self.len_buffer.len() == 4 {
                    let mut len_buffer = Vec::new();
                    swap(&mut self.len_buffer, &mut len_buffer);
                    self.msg_len = io::Cursor::new(len_buffer).read_u32::<LittleEndian>().unwrap() as usize;
                    self.is_reading_length = false
                }

            } else {

                self.msg_buffer.push(byte);

                if self.msg_buffer.len() == self.msg_len {
                    let mut msg_buffer = Vec::new();
                    swap(&mut self.msg_buffer, &mut msg_buffer);
                    result.push(msg_buffer);
                    self.is_reading_length = true
                }
            }
        }
        result
    }

This seems pretty verbose an very inefficient (I haven’t written any benchmarks, so I may be wrong about the second one).

Is there a better way?


#2

I would create a function like:

fn read(stream : &mut TcpStream, total : usize) -> Option<Vec<u8>>
{
    let mut buffer = Vec::with_capacity(total);
    let mut done   = 0;
    
    unsafe { 
        buffer.set_len(total)
    }
    
    while done < total {
         
        if let Ok(count) = stream.read(&mut buffer[done..total]) {
            done += count;    
        } else {
            break;
        }   
    }
    
    if done == total {
        Some(buffer)
    } else {
        None
    }   
}

And use that to read the blocks of data. Should be less iterations. And from what I remember about network code, if the far end writes X bytes the local end receives those X bytes in one block. You can’t count on it, but you can expect it. Which means in reality the while loop will only execute once for each block of bytes it reads.


#3

Hm, I cannot relay on the fact that far end writes a message in one chunk: the message size is almost unbounded. Also, if read function returns None it means that some part of the message is dropped on the floor which is no good. So I need to keep some state in the connection…


#4

The above code will work if the bytes are sent one at a time, it will probably be about as efficient as the original code. If however multiple bytes are sent together then there are less times through the loop.

The None gets returned if the connection drops. If you need that partial packet you would need to do a:

unsafe { buffer.set_len(done); }
Some(buffer)

in place of the final condition. That will return that data that was received before the connection terminated.


#5

Hm, my understanding is that because socket is in non blocking mod I can get None even if the connection is alive.


#6

Uhm, not sure why the socket is in non blocking. I didn’t see that setting on the TcpStream object. There is a set_read_timeout method, but I’m assuming that means read will return Ok(0) rather than an error, because otherwise how would you detect that the connection terminated?

If the socket is in non blocking mode, then I don’t think you can use my method since it would spin and use 100% of the CPU waiting for more data far end.


#7

I’m using the mio library, so the socket is non blocking.


#8

Okay so in that case I’d probably go with something like:

struct Buffer {
    data     : Vec<u8>,
    expected : usize,
    received : usize
}

impl Buffer {
    pub fn new(size : usize) -> Buffer {
        let mut data = Vec::with_capacity(size);
        
        unsafe {
            data.set_len(size);
        }
        
        Buffer {
            data     : data,
            expected : size,
            received : 0
        }
    }
    
    pub fn recv(&mut self, stream : &mut TcpStream) -> bool {
        
        while let Ok(count) = stream.read(&mut self.data[self.received..self.expected]) {
            
            if count > 0 {
                self.received += count;
            } else {
                break;
            }
        }
        
        self.expected == self.received 
    }
    
    pub fn buffer(&self) -> &Vec<u8> {
        &self.data
    }
}

The recv() function returns true if the it’s gotten all the expected data, false if there is more data expected. This object does not handle when the connection goes down, that has to be detected outside of the function.

There probably should also be some more checks like panicing if recv() is called when received == expected or if buffer() is called when received != expected.