Network byte buffer reaches capacity and no longer gives information to higher-level API

I'm using the KCP protocol (experimental) asynchronously. There is a variable (&mut [u8]) called "buf" being passed around, getting filled and being trimmed. The starting size of this buffer is about 8000, then as data comes through, it gets smaller and smaller. Once the buffer's length reaches ~80, incoming data is, at best, partially filled. As such, once the limit is reached, the higher-level API cannot receive the data.

I've spent dozens of hours tracking the bug and fixing somebody else's API, and at this point I'm ready to shoot myself.

I've tried unsafely resizing the buffer to the size of incoming data, but haven't been able to without there being a SegFault. I've tried finding the original declaration of "buf", but it's nowhere to be found.

Am I missing something?

Thomas

If the buffer that's passed in is a fixed size, it's a fixed size, and you can't really change that. Presumably, from time to time, you need to return all the way up the stack and pass in a new buffer.

Without any code to look at I don't think we'll be able to offer much help.

1 Like

I can indeed share some code, but for now maybe there's another way... I have been trying to look for the declaration of the byte-buffer, but I can't find it anywhere other than in function parameters. Is mio responsible for supplying the buffer for an async Read<> operation?

use std::cell::RefCell;
use std::collections::HashMap;
use std::io::{self, Read, Write};
use std::net::SocketAddr;
use std::rc::Rc;
use std::time::{Duration, Instant};

use bytes::{Buf, BufMut, ByteOrder, LittleEndian};
use ctime;
use futures::stream::Stream;
use futures::{Async, Future, Poll};
use iovec::IoVec;
use mio::event::Evented;
use mio::{self, PollOpt, Ready, Registration, SetReadiness, Token};
use rand;
use tokio_core::net::UdpSocket;
use tokio_core::reactor::{Handle, PollEvented, Timeout};
use tokio_io::{AsyncRead, AsyncWrite};

use Kcb;
use std::alloc::Layout;

struct KcpPair {
k: Rc<RefCell<Kcb<KcpOutput>>>,
set_readiness: SetReadiness,
token: Rc<RefCell<Timeout>>,
}

pub struct KcpListener {
udp: Rc<UdpSocket>,
connections: HashMap<SocketAddr, KcpPair>,
handle: Handle,
}

pub struct Incoming {
inner: KcpListener,
}

kcp.rs

impl KcpListener {
pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result<KcpListener> {
    let udp = UdpSocket::bind(addr, handle).unwrap();
    let listener = KcpListener {
        udp: Rc::new(udp),
        connections: HashMap::new(),
        handle: handle.clone(),
    };
    Ok(listener)
}

pub fn accept(&mut self) -> io::Result<(KcpStream, SocketAddr)> {
    let mut buf = vec![0; 1024];
    loop {
        match self.udp.recv_from(&mut buf) {
            Err(e) => {
                return Err(e);
            }
            Ok((n, addr)) => {
                if self.connections.contains_key(&addr) {
                    if let Some(kp) = self.connections.get(&addr) {
                        let mut kcb = kp.k.borrow_mut();
                        kcb.input(&buf[..n]);

                        kcb.update(clock());
                        let dur = kcb.check(clock());
                        kp.token
                            .borrow_mut()
                            .reset(Instant::now() + Duration::from_millis(dur as u64));

                        kp.set_readiness.set_readiness(mio::Ready::readable());
                    }
                } else {
                    let conv = LittleEndian::read_u32(&buf[..4]);
                    let mut kcb = Kcb::new(
                        conv,
                        KcpOutput {
                            udp: self.udp.clone(),
                            peer: addr.clone(),
                        },
                    );
                    kcb.wndsize(128, 128);
                    kcb.nodelay(1, 10, 0, true);
                    let kcb = Rc::new(RefCell::new(kcb));
                    let (registration, set_readiness) = Registration::new2();
                    let now = Instant::now();
                    let token = Timeout::new_at(now, &self.handle).unwrap();
                    let token = Rc::new(RefCell::new(token));
                    let core = KcpCore {
                        kcb: kcb.clone(),
                        registration,
                        set_readiness: set_readiness.clone(),
                        token: token.clone(),
                    };
                    let interval = KcpInterval {
                        kcb: kcb.clone(),
                        token: token.clone(),
                    };
                    &self
                        .handle
                        .spawn(interval.for_each(|_| Ok(())).then(|_| Ok(())));
                    let io = PollEvented::new(core, &self.handle).unwrap();
                    let stream = KcpStream { io };
                    stream.io.get_ref().kcb.borrow_mut().input(&buf[..n]);

                    let kcbc = kcb.clone();
                    let mut kcb1 = kcbc.borrow_mut();
                    kcb1.update(clock());
                    let dur = kcb1.check(clock());
                    token
                        .borrow_mut()
                        .reset(Instant::now() + Duration::from_millis(dur as u64));

                    stream
                        .io
                        .get_ref()
                        .set_readiness
                        .set_readiness(mio::Ready::readable());

                    let kp = KcpPair {
                        k: kcb.clone(),
                        set_readiness: set_readiness.clone(),
                        token: token.clone(),
                    };
                    self.connections.insert(addr, kp);
                    return Ok((stream, addr));
                }
            }
        }
    }
}

pub fn incoming(self) -> Incoming {
    Incoming { inner: self }
}
}

impl Stream for Incoming {
type Item = (KcpStream, SocketAddr);
type Error = io::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
    Ok(Async::Ready(Some(try_nb!(self.inner.accept()))))
}
}

struct Server {
socket: Rc<UdpSocket>,
buf: Vec<u8>,
to_send: Option<(usize, SocketAddr)>,
kcb: Rc<RefCell<Kcb<KcpOutput>>>,
set_readiness: SetReadiness,

token: Rc<RefCell<Timeout>>,
}

impl Future for Server {
type Item = ();
type Error = io::Error;

fn poll(&mut self) -> Poll<(), io::Error> {
    loop {
        if let Some((size, peer)) = self.to_send {
            let mut kcb = self.kcb.borrow_mut();
            kcb.input(&self.buf[..size]);

            kcb.update(clock());
            let dur = kcb.check(clock());
            self.token
                .borrow_mut()
                .reset(Instant::now() + Duration::from_millis(dur as u64));

            self.set_readiness.set_readiness(mio::Ready::readable());
            self.to_send = None;
        }

        self.to_send = Some(try_nb!(self.socket.recv_from(&mut self.buf)));
    }
}
}

pub struct KcpStreamNew {
inner: Option<KcpStream>,
}

impl Future for KcpStreamNew {
type Item = KcpStream;
type Error = io::Error;

fn poll(&mut self) -> Poll<KcpStream, io::Error> {
    Ok(Async::Ready(self.inner.take().unwrap()))
}
}

struct KcpInterval {
kcb: Rc<RefCell<Kcb<KcpOutput>>>,
token: Rc<RefCell<Timeout>>,
}

impl Stream for KcpInterval {
type Item = ();
type Error = io::Error;

fn poll(&mut self) -> Poll<Option<()>, io::Error> {
    let mut token = self.token.borrow_mut();
    match token.poll() {
        Ok(Async::Ready(())) => {
            let mut kcb = self.kcb.borrow_mut();
            kcb.update(clock());
            let dur = kcb.check(clock());
            let next = Instant::now() + Duration::from_millis(dur as u64);
            token.reset(next);
            Ok(Async::Ready(Some(())))
        }
        Ok(Async::NotReady) => Ok(Async::NotReady),
        Err(e) => Err(e),
    }
}
}

struct KcpCore {
kcb: Rc<RefCell<Kcb<KcpOutput>>>,
registration: Registration,
set_readiness: SetReadiness,
token: Rc<RefCell<Timeout>>,
}

impl KcpCore {
pub fn read_bufs(&self, bufs: &mut [&mut IoVec]) -> io::Result<usize> {
    unimplemented!()
}

pub fn write_bufs(&self, bufs: &[&IoVec]) -> io::Result<usize> {
    unimplemented!()
}

}

impl Read for KcpCore {
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
    println!("KCP_CORE: {}", &buf.len());
    let result = self.kcb.borrow_mut().recv(buf);
    match result {
        Err(e) => Err(io::Error::new(io::ErrorKind::WouldBlock, "would block")),
        Ok(n) => Ok(n),
    }
}

}

impl Write for KcpCore {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
    let mut kcb = self.kcb.borrow_mut();
    let result = kcb.send(buf);
    kcb.update(clock());
    let dur = kcb.check(clock());
    kcb.flush();
    self.token
        .borrow_mut()
        .reset(Instant::now() + Duration::from_millis(dur as u64));
    result
}

fn flush(&mut self) -> io::Result<()> {
    Ok(())
}
}

impl Evented for KcpCore {
fn register(
    &self,
    poll: &mio::Poll,
    token: Token,
    interest: Ready,
    opts: PollOpt,
) -> io::Result<()> {
    self.registration.register(poll, token, interest, opts)
}

fn reregister(
    &self,
    poll: &mio::Poll,
    token: Token,
    interest: Ready,
    opts: PollOpt,
) -> io::Result<()> {
    self.registration.reregister(poll, token, interest, opts)
}

fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
    self.registration.deregister(poll)
}
}

pub struct KcpStream {
io: PollEvented<KcpCore>,
}

impl KcpStream {
pub fn connect(addr: &SocketAddr, handle: &Handle) -> KcpStreamNew {
    let r = match addr.is_ipv4() {
        true => "0.0.0.0:0".parse().unwrap(),
        false => "[::]:0".parse().unwrap()
    };
    //let r: SocketAddr = "0.0.0.0:0".parse().unwrap();
    //println!("[WARNING] Using IPv6 mode (kcp.rs:295)");
    //above is bug fix for "127.0.0.1:8080"
    let udp = UdpSocket::bind(&r, handle).unwrap();
    let udp = Rc::new(udp);
    let conv = rand::random::<u32>();
    let mut kcb = Kcb::new(
        conv,
        KcpOutput {
            udp: udp.clone(),
            peer: addr.clone(),
        },
    );
    kcb.wndsize(128, 128);
    kcb.nodelay(1, 10, 2, true);
    let kcb = Rc::new(RefCell::new(kcb));
    let (registration, set_readiness) = Registration::new2();
    let now = Instant::now();
    let token = Timeout::new_at(now, handle).unwrap();
    let token = Rc::new(RefCell::new(token));
    let core = KcpCore {
        kcb: kcb.clone(),
        registration,
        set_readiness: set_readiness.clone(),
        token: token.clone(),
    };

    let interval = KcpInterval {
        kcb: kcb.clone(),
        token: token.clone(),
    };
    handle.spawn(interval.for_each(|_| Ok(())).then(|_| Ok(())));
    let io = PollEvented::new(core, handle).unwrap();
    let inner = KcpStream { io };
    handle.spawn(
        Server {
            socket: udp.clone(),
            buf: vec![0; 1024],
            to_send: None,
            kcb: kcb.clone(),
            set_readiness: set_readiness.clone(),
            token: token.clone(),
        }
        .then(|_| Ok(())),
    );
    KcpStreamNew { inner: Some(inner) }
}

pub fn _connect(local_port_bind: u16, addr: &SocketAddr, handle: &Handle) -> KcpStreamNew {
    let r = match addr.is_ipv4() {
        true => format!("0.0.0.0:{}", local_port_bind).as_str().parse().unwrap(),
        false => format!("[::]:{}", local_port_bind).as_str().parse().unwrap()
    };
    //let r: SocketAddr = "0.0.0.0:0".parse().unwrap();
    //println!("[WARNING] Using IPv6 mode (kcp.rs:295)");
    //above is bug fix for "127.0.0.1:8080"
    let udp = UdpSocket::bind(&r, handle).unwrap();

    let udp = Rc::new(udp);
    let conv = rand::random::<u32>();
    let mut kcb = Kcb::new(
        conv,
        KcpOutput {
            udp: udp.clone(),
            peer: addr.clone(),
        },
    );
    kcb.wndsize(128, 128);
    kcb.nodelay(1, 10, 2, true);
    let kcb = Rc::new(RefCell::new(kcb));
    let (registration, set_readiness) = Registration::new2();
    let now = Instant::now();
    let token = Timeout::new_at(now, handle).unwrap();
    let token = Rc::new(RefCell::new(token));
    let core = KcpCore {
        kcb: kcb.clone(),
        registration,
        set_readiness: set_readiness.clone(),
        token: token.clone(),
    };

    let interval = KcpInterval {
        kcb: kcb.clone(),
        token: token.clone(),
    };
    handle.spawn(interval.for_each(|_| Ok(())).then(|_| Ok(())));
    let io = PollEvented::new(core, handle).unwrap();
    let inner = KcpStream { io };
    handle.spawn(
        Server {
            socket: udp.clone(),
            buf: vec![0; 1024],
            to_send: None,
            kcb: kcb.clone(),
            set_readiness: set_readiness.clone(),
            token: token.clone(),
        }
            .then(|_| Ok(())),
    );
    KcpStreamNew { inner: Some(inner) }
}

pub fn poll_read(&self) -> Async<()> {
    self.io.poll_read()
}

pub fn poll_write(&self) -> Async<()> {
    self.io.poll_write()
}
}

impl Read for KcpStream {
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
    //let buf0: &'static mut [u8] = &mut vec![0; 1024];
    //buf = buf0;

    let res = self.io.read(buf);
    let msg = unsafe {String::from_utf8_lossy(&buf).to_string()};
    println!("ASYNC_READ![{}]: {}", msg.len(), msg);
    res
}
}

impl Write for KcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
    self.io
        .get_ref()
        .set_readiness
        .set_readiness(mio::Ready::writable());
    self.io.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
    self.io.flush()
}
}

impl AsyncRead for KcpStream {
unsafe fn prepare_uninitialized_buffer(&self, mut buf: &mut [u8]) -> bool {
    println!("SIZEOF_BUFFER_IN_PREP: {} at  {:?}", buf.len(), buf.as_ptr());
    false
}

fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
    <&KcpStream>::read_buf(&mut &*self, buf)
}
}

impl AsyncWrite for KcpStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
    <&KcpStream>::shutdown(&mut &*self)
}

fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
    <&KcpStream>::write_buf(&mut &*self, buf)
}
}

impl<'a> Read for &'a KcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
    unimplemented!()
}
}

impl<'a> Write for &'a KcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
    unimplemented!()
}

fn flush(&mut self) -> io::Result<()> {
    unimplemented!()
}
}

impl<'a> AsyncRead for &'a KcpStream {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
    false
}

fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
    if let Async::NotReady = <KcpStream>::poll_read(self) {
        return Ok(Async::NotReady);
    }
    let r = unsafe {
        let mut bufs: [_; 16] = Default::default();
        let n = buf.bytes_vec_mut(&mut bufs);
        self.io.get_ref().read_bufs(&mut bufs[..n])
    };

    match r {
        Ok(n) => {
            unsafe {
                buf.advance_mut(n);
            }
            Ok(Async::Ready(n))
        }
        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
            self.io.need_read();
            Ok(Async::NotReady)
        }
        Err(e) => Err(e),
    }
}
}

impl<'a> AsyncWrite for &'a KcpStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
    Ok(().into())
}

fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
    if let Async::NotReady = <KcpStream>::poll_write(self) {
        return Ok(Async::NotReady);
    }
    let r = {
        let mut bufs: [_; 16] = Default::default();
        let n = buf.bytes_vec(&mut bufs);
        self.io.get_ref().write_bufs(&bufs[..n])
    };
    match r {
        Ok(n) => {
            buf.advance(n);
            Ok(Async::Ready(n))
        }
        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
            self.io.need_write();
            Ok(Async::NotReady)
        }
        Err(e) => Err(e),
    }
}
}

#[inline]
fn clock() -> u32 {
let timespec = ctime::get_time();
let mills = timespec.sec * 1000 + timespec.nsec as i64 / 1000 / 1000;
mills as u32
}

pub struct KcpOutput {
udp: Rc<UdpSocket>,
peer: SocketAddr,
}

impl Write for KcpOutput {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
    let j = &buf[..=3];
    let msg = String::from_utf8_lossy(buf).to_string();
    //println!("[KCP] {}", msg);
    //println!("buf.len={} ||| [0] = {}, [1] = {}, [2] = {}, [3] = {}", &buf.len(), j[0], j[1], j[2], j[3]);
    self.udp.send_to(buf, &self.peer)
}

fn flush(&mut self) -> io::Result<()> {
    Ok(())
}
}
use std::{cmp};
use std::collections::VecDeque;
use std::io::{self, Cursor, Error, ErrorKind, Read, Write};

use bytes::{Buf, BufMut, BytesMut, LittleEndian};
use std::alloc::Layout;
use core::borrow::BorrowMut;
use core::mem;
use alloc::slice;

const KCP_RTO_NDL: u32 = 30; // no delay min rto
const KCP_RTO_MIN: u32 = 100; // normal min rto
const KCP_RTO_DEF: u32 = 200;
const KCP_RTO_MAX: u32 = 60_000;
const KCP_CMD_PUSH: u8 = 81; // cmd: push data
const KCP_CMD_ACK: u8 = 82; // cmd: ack
const KCP_CMD_WASK: u8 = 83; // cmd: window probe (ask)
const KCP_CMD_WINS: u8 = 84; // cmd: window size (tell)
const KCP_ASK_SEND: u32 = 0b01; // need to send KCP_CMD_WASK
const KCP_ASK_TELL: u32 = 0b10; // need to send KCP_CMD_WINS
const KCP_WND_SND: u32 = 32;
const KCP_WND_RCV: u32 = 32;
const KCP_MTU_DEF: usize = 7_500;
// const KCP_ACK_FAST: u32 = 3; // never used
const KCP_INTERVAL: u32 = 100;
const KCP_OVERHEAD: usize = 24;
// const KCP_DEADLINK: u32 = 20; // never used
const KCP_THRESH_INIT: u32 = 2;
const KCP_THRESH_MIN: u32 = 2;
const KCP_PROBE_INIT: u32 = 7_000; // 7 secs to probe window size
const KCP_PROBE_LIMIT: u32 = 120_000; // up to 120 secs to probe window

#[derive(Default)]
struct Segment {
    conv: u32,
    cmd: u8,
    frg: u8,
    wnd: u32,
    ts: u32,
    sn: u32,
    una: u32,
    resendts: u32,
    rto: u32,
    fastack: u32,
    xmit: u32,
    data: Vec<u8>,
}

impl Segment {
    fn encode(&self, buf: &mut BytesMut) {
        buf.put_u32::<LittleEndian>(self.conv);
        buf.put::<u8>(self.cmd);
        buf.put::<u8>(self.frg);
        buf.put_u16::<LittleEndian>(self.wnd as u16);
        buf.put_u32::<LittleEndian>(self.ts);
        buf.put_u32::<LittleEndian>(self.sn);
        buf.put_u32::<LittleEndian>(self.una);
        buf.put_u32::<LittleEndian>(self.data.len() as u32);
        buf.put_slice(&self.data);
    }
}

/// KCP control block
pub struct Kcb<W: Write> {
    conv: u32,
    mtu: usize,
    mss: usize,
    // state: u32, // never used
    snd_una: u32,
    snd_nxt: u32,
    rcv_nxt: u32,

    // ts_recent: u32, // never used
    // ts_lastack: u32, // never used
    ssthresh: u32,

    rx_rttval: u32,
    rx_srtt: u32,
    rx_rto: u32,
    rx_minrto: u32,

    snd_wnd: u32,
    rcv_wnd: u32,
    rmt_wnd: u32,
    cwnd: u32,
    probe: u32,

    current: u32,
    interval: u32,
    ts_flush: u32,
    xmit: u32,

    nodelay: u32,
    updated: bool,

    ts_probe: u32,
    probe_wait: u32,

    // dead_link: u32, // never used
    incr: u32,

    snd_queue: VecDeque<Segment>,
    rcv_queue: VecDeque<Segment>,
    snd_buf: VecDeque<Segment>,
    rcv_buf: VecDeque<Segment>,

    acklist: Vec<(u32, u32)>,

    // user: String,
    buffer: BytesMut,

    fastresend: u32,

    nocwnd: bool,
    stream: bool,

    output: W,
}

impl<W: Write> Kcb<W> {
    /// create a new kcp control object, `conv` must equal in two endpoint
    /// from the same connection. `user` will be passed to the output callback
    pub fn new(conv: u32, output: W) -> Kcb<W> {
        Kcb {
            // state: 0,
            snd_una: 0,
            snd_nxt: 0,
            rcv_nxt: 0,
            // ts_recent: 0,
            // ts_lastack: 0,
            rx_rttval: 0,
            rx_srtt: 0,
            cwnd: 0,
            probe: 0,
            current: 0,
            xmit: 0,
            nodelay: 0,
            updated: false,
            ts_probe: 0,
            probe_wait: 0,
            incr: 0,
            fastresend: 0,
            nocwnd: false,
            stream: false,

            conv,
            snd_wnd: KCP_WND_SND,
            rcv_wnd: KCP_WND_RCV,
            rmt_wnd: KCP_WND_RCV,
            mtu: KCP_MTU_DEF,
            mss: KCP_MTU_DEF - KCP_OVERHEAD,
            // user: user,
            buffer: BytesMut::with_capacity((KCP_MTU_DEF + KCP_OVERHEAD) * 3),
            snd_queue: VecDeque::new(),
            rcv_queue: VecDeque::new(),
            snd_buf: VecDeque::new(),
            rcv_buf: VecDeque::new(),
            acklist: Vec::new(),
            rx_rto: KCP_RTO_DEF,
            rx_minrto: KCP_RTO_MIN,
            interval: KCP_INTERVAL,
            ts_flush: KCP_INTERVAL,
            ssthresh: KCP_THRESH_INIT, // dead_link: KCP_DEADLINK,
            output,
        }
    }

    /// user/upper level recv: returns size, returns Err for EAGAIN
    pub fn recv(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {

        if self.rcv_queue.is_empty() {
            println!("EOF");
            return Err(Error::new(ErrorKind::Other, "EOF"));
        }
        let peeksize = match self.peeksize() {
            Ok(x) => x,
            Err(_) => {
                eprintln!("unexpected EOF");
                return Err(Error::new(ErrorKind::UnexpectedEof, "unexpected EOF"))
            },
        };

        if peeksize > buf.len() {
            eprintln!("Peeksize > buf.len() ::: {} , {}", peeksize, buf.len());
            eprintln!("Buffer before: {}", buf.len());
            eprintln!("Buffer after: {}", buf.len());
            //return Err(Error::new(ErrorKind::InvalidInput, "short buffer"));
        }

        let recover = self.rcv_queue.len() >= self.rcv_wnd as usize;


        // merge fragment
        let mut buf = Cursor::new(buf);

        let mut index: usize = 0;
        for seg in &self.rcv_queue {
            buf.write_all(&seg.data)?;
            index += 1;
            if seg.frg == 0 {
                break;
            }
        }
        if index > 0 {
            let new_rcv_queue = self.rcv_queue.split_off(index);
            println!("RCV_QUEUE LEN_BEFORE: {}", self.rcv_queue.len());
            self.rcv_queue = new_rcv_queue;
            println!("RCV_QUEUE_LEN_AFTER: {}", self.rcv_queue.len());
        } else {
            eprintln!("ERROR: INDEX==0");
        }
        assert_eq!(buf.position() as usize, peeksize);

        // move available data from rcv_buf -> rcv_queue
        index = 0;
        let mut nrcv_que = self.rcv_queue.len();

        for seg in &self.rcv_buf {
            if seg.sn == self.rcv_nxt && nrcv_que < self.rcv_wnd as usize {
                nrcv_que += 1;
                self.rcv_nxt += 1;
                index += 1;
            } else {
                break;
            }
        }

        if index > 0 {
            let new_rcv_buf = self.rcv_buf.split_off(index);
            self.rcv_queue.append(&mut self.rcv_buf);
            self.rcv_buf = new_rcv_buf;
        }

        // fast recover
        if self.rcv_queue.len() < self.rcv_wnd as usize && recover {
            // ready to send back KCP_CMD_WINS in `flush`
            // tell remote my window size
            self.probe |= KCP_ASK_TELL;
        }

        println!("INDEX_SIZE = {}", buf.position());
        Ok(buf.position() as usize)
    }

    /// check the size of next message in the recv queue
    fn peeksize(&self) -> Result<usize, i32> {
        let seg = match self.rcv_queue.front() {
            Some(x) => x,
            None => return Err(-1),
        };
        if seg.frg == 0 {
            return Ok(seg.data.len());
        }
        if self.rcv_queue.len() < (seg.frg + 1) as usize {
            return Err(-1);
        }
        let mut length: usize = 0;
        for seg in &self.rcv_queue {
            length += seg.data.len();
            if seg.frg == 0 {
                break;
            }
        }

        Ok(length)
    }

    /// user/upper level send, returns Err for error
    pub fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
        let n = buf.len();
        if n == 0 {
            return Err(Error::new(ErrorKind::InvalidInput, "no data available"));
        }
        let mut buf = Cursor::new(buf);

        // append to previous segment in streaming mode (if possible)
        if self.stream {
            if let Some(seg) = self.snd_queue.back_mut() {
                let l = seg.data.len();
                if l < self.mss as usize {
                    let new_len = cmp::min(l + n, self.mss as usize);
                    seg.data.resize(new_len, 0);
                    buf.read_exact(&mut seg.data[l..new_len])?;
                    seg.frg = 0;
                    if buf.remaining() == 0 {
                        return Ok(1);
                    }
                }
            };
        }

        let count = if buf.remaining() <= self.mss as usize {
            1
        } else {
            (buf.remaining() + self.mss as usize - 1) / self.mss as usize
        };

        if count > 255 {
            return Err(Error::new(ErrorKind::InvalidInput, "data too long"));
        }
        assert!(count > 0);
        let count = count as u8;

        // fragment
        for i in 0..count {
            let size = cmp::min(self.mss as usize, buf.remaining());
            let mut seg = Segment::default();
            seg.data.resize(size, 0);
            buf.read_exact(&mut seg.data)?;
            seg.frg = if !self.stream { count - i - 1 } else { 0 };
            self.snd_queue.push_back(seg);
        }
        Ok(n - buf.remaining())
    }

    fn update_ack(&mut self, rtt: u32) {
        if self.rx_srtt == 0 {
            self.rx_srtt = rtt;
            self.rx_rttval = rtt / 2;
        } else {
            let delta = if rtt > self.rx_srtt {
                rtt - self.rx_srtt
            } else {
                self.rx_srtt - rtt
            };
            self.rx_rttval = (3 * self.rx_rttval + delta) / 4;
            self.rx_srtt = (7 * self.rx_srtt + rtt) / 8;
            if self.rx_srtt < 1 {
                self.rx_srtt = 1;
            }
        }
        let rto = self.rx_srtt + cmp::max(self.interval, 4 * self.rx_rttval);
        self.rx_rto = bound(self.rx_minrto, rto, KCP_RTO_MAX);
    }

    #[inline]
    fn shrink_buf(&mut self) {
        self.snd_una = match self.snd_buf.front() {
            Some(seg) => seg.sn,
            None => self.snd_nxt,
        };
    }

    fn parse_ack(&mut self, sn: u32) {
        if sn < self.snd_una || sn >= self.snd_nxt {
            return;
        }
        for i in 0..self.snd_buf.len() {
            if sn == self.snd_buf[i].sn {
                self.snd_buf.remove(i);
                break;
            } else if sn < self.snd_buf[i].sn {
                break;
            }
        }
    }

    fn parse_una(&mut self, una: u32) {
        let mut index: usize = 0;
        for seg in &self.snd_buf {
            if una > seg.sn {
                index += 1;
            } else {
                break;
            }
        }
        if index > 0 {
            let new_snd_buf = self.snd_buf.split_off(index);
            self.snd_buf = new_snd_buf;
        }
    }

    fn parse_fastack(&mut self, sn: u32) {
        if sn < self.snd_una || sn >= self.snd_nxt {
            return;
        }
        for seg in &mut self.snd_buf {
            if sn < seg.sn {
                break;
            } else if sn != seg.sn {
                seg.fastack += 1;
            }
        }
    }

    fn parse_data(&mut self, newseg: Segment) {
        let sn = newseg.sn;
        if sn >= self.rcv_nxt + self.rcv_wnd || sn < self.rcv_nxt {
            //ikcp_segment_delete(kcp, newseg);
            return;
        }

        let mut repeat = false;
        let mut index: usize = self.rcv_buf.len();
        for seg in self.rcv_buf.iter().rev() {
            if sn == seg.sn {
                repeat = true;
                break;
            } else if sn > seg.sn {
                break;
            }
            index -= 1;
        }

        if !repeat {
            self.rcv_buf.insert(index, newseg);
        } else {
            // ikcp_segment_delete(kcp, newseg);
        }

        // move available data from rcv_buf -> rcv_queue
        index = 0;
        let mut nrcv_que = self.rcv_queue.len();
        for seg in &self.rcv_buf {
            if seg.sn == self.rcv_nxt && nrcv_que < self.rcv_wnd as usize {
                nrcv_que += 1;
                self.rcv_nxt += 1;
                index += 1;
            } else {
                break;
            }
        }
        if index > 0 {
            let new_rcv_buf = self.rcv_buf.split_off(index);
            self.rcv_queue.append(&mut self.rcv_buf);
            self.rcv_buf = new_rcv_buf;
        }
    }

    /// when you received a low level packet (eg. UDP packet), call it
    pub fn input(&mut self, buf: &[u8]) -> io::Result<usize> {
        let s_len = &buf.len();
        let bytes = buf.clone();
        println!("INPUT_LEN_KCB: {}", s_len);
        let n = buf.len();
        let mut buf = Cursor::new(buf);

        if buf.remaining() < KCP_OVERHEAD {
            return Err(Error::new(ErrorKind::InvalidData, "invalid data"));
        }
        let old_una = self.snd_una;
        let mut flag = false;
        let mut maxack: u32 = 0;
        while buf.remaining() >= KCP_OVERHEAD {
            let conv = buf.get_u32::<LittleEndian>();
            if conv != self.conv {
                return Err(Error::new(ErrorKind::InvalidData, "invalid data"));
            }

            let cmd = buf.get_u8();
            let frg = buf.get_u8();
            let wnd = buf.get_u16::<LittleEndian>();
            let ts = buf.get_u32::<LittleEndian>();
            let sn = buf.get_u32::<LittleEndian>();
            let una = buf.get_u32::<LittleEndian>();
            let len = buf.get_u32::<LittleEndian>();

            let len = len as usize;

            let start = s_len - len;
            let payload = &bytes[start..];
            let msg = String::from_utf8_lossy(payload).to_string();

            println!("payload_len: {}", &payload.len());
            println!("payload: {}", msg);

            if buf.remaining() < len {
                return Err(Error::new(ErrorKind::UnexpectedEof, "unexpected EOF"));
            }

            if cmd != KCP_CMD_PUSH && cmd != KCP_CMD_ACK && cmd != KCP_CMD_WASK &&
                cmd != KCP_CMD_WINS
            {
                return Err(Error::new(ErrorKind::InvalidData, "invalid data"));
            }

            self.rmt_wnd = wnd as u32;
            self.parse_una(una);
            self.shrink_buf();

            if cmd == KCP_CMD_ACK {
                let rtt = timediff(self.current, ts);
                if rtt >= 0 {
                    self.update_ack(rtt as u32);
                }
                self.parse_ack(sn);
                self.shrink_buf();
                if !flag {
                    flag = true;
                    maxack = sn;
                } else {
                    if sn > maxack {
                        maxack = sn;
                    }
                }
            } else if cmd == KCP_CMD_PUSH {
                if sn < self.rcv_nxt + self.rcv_wnd {
                    self.acklist.push((sn, ts));
                    if sn >= self.rcv_nxt {
                        let mut seg = Segment::default();
                        seg.conv = conv;
                        seg.cmd = cmd;
                        seg.frg = frg;
                        seg.wnd = wnd as u32;
                        seg.ts = ts;
                        seg.sn = sn;
                        seg.una = una;
                        seg.data.resize(len, 0);
                        buf.read_exact(&mut seg.data)?;
                        self.parse_data(seg);
                    }
                }
            } else if cmd == KCP_CMD_WASK {
                // ready to send back KCP_CMD_WINS in `flush`
                // tell remote my window size
                self.probe |= KCP_ASK_TELL;
            } else if cmd == KCP_CMD_WINS {
                // do nothing
            } else {
                return Err(Error::new(ErrorKind::InvalidData, "invalid data"));
            }
        }
        if flag {
            self.parse_fastack(maxack);
        }

        if self.snd_una > old_una {
            if self.cwnd < self.rmt_wnd {
                let mss = self.mss as u32;
                if self.cwnd < self.ssthresh {
                    self.cwnd += 1;
                    self.incr += mss;
                } else {
                    if self.incr < mss {
                        self.incr = mss;
                    }
                    self.incr += (mss * mss) / self.incr + (mss / 16);
                    if (self.cwnd + 1) * mss <= self.incr {
                        self.cwnd += 1;
                    }
                }
                if self.cwnd > self.rmt_wnd {
                    self.cwnd = self.rmt_wnd;
                    self.incr = self.rmt_wnd * mss;
                }
            }
        }
        let remaining = n - buf.remaining();
        Ok(remaining)
    }

    fn wnd_unused(&self) -> u32 {
        let nrcv_que = self.rcv_queue.len() as u32;
        if nrcv_que < self.rcv_wnd {
            return self.rcv_wnd - nrcv_que;
        }
        0
    }

    /// flush pending data
    pub fn flush(&mut self) {
        // `update` haven't been called.
        if !self.updated {
            return;
        }
        let current = self.current;
        let mut lost = false;
        let mut change = false;
        let mut seg = Segment::default();

        seg.conv = self.conv;
        seg.cmd = KCP_CMD_ACK;
        seg.wnd = self.wnd_unused();
        seg.una = self.rcv_nxt;

        // flush acknowledges
        for ack in &self.acklist {
            if self.buffer.remaining_mut() + KCP_OVERHEAD > self.mtu {
                self.output.write_all(&self.buffer);

                self.buffer.clear();
            }
            seg.sn = ack.0;
            seg.ts = ack.1;
            seg.encode(&mut self.buffer);
        }
        self.acklist.clear();

        // probe window size (if remote window size equals zero)
        if self.rmt_wnd == 0 {
            if self.probe_wait == 0 {
                self.probe_wait = KCP_PROBE_INIT;
                self.ts_probe = self.current + self.probe_wait;
            } else {
                if timediff(self.current, self.ts_probe) >= 0 {
                    if self.probe_wait < KCP_PROBE_INIT {
                        self.probe_wait = KCP_PROBE_INIT;
                    }
                    self.probe_wait += self.probe_wait / 2;
                    if self.probe_wait > KCP_PROBE_LIMIT {
                        self.probe_wait = KCP_PROBE_LIMIT;
                    }
                    self.ts_probe = self.current + self.probe_wait;
                    self.probe |= KCP_ASK_SEND;
                }
            }
        } else {
            self.ts_probe = 0;
            self.probe_wait = 0;
        }

        // flush window probing commands
        if (self.probe & KCP_ASK_SEND) != 0 {
            seg.cmd = KCP_CMD_WASK;
            if self.buffer.remaining_mut() + KCP_OVERHEAD > self.mtu {
                self.output.write_all(&self.buffer);
                self.buffer.clear();
            }
            seg.encode(&mut self.buffer);
        }

        // flush window probing commands
        if (self.probe & KCP_ASK_TELL) != 0 {
            seg.cmd = KCP_CMD_WINS;
            if self.buffer.remaining_mut() + KCP_OVERHEAD > self.mtu {
                self.output.write_all(&self.buffer);
                self.buffer.clear();
            }
            seg.encode(&mut self.buffer);
        }
        self.probe = 0;

        // calculate window size
        let mut cwnd = cmp::min(self.snd_wnd, self.rmt_wnd);
        if !self.nocwnd {
            cwnd = cmp::min(self.cwnd, cwnd);
        }

        // move data from snd_queue to snd_buf
        while self.snd_nxt < self.snd_una + cwnd {
            if let Some(mut newseg) = self.snd_queue.pop_front() {
                newseg.conv = self.conv;
                newseg.cmd = KCP_CMD_PUSH;
                newseg.wnd = seg.wnd;
                newseg.ts = current;
                newseg.sn = self.snd_nxt;
                self.snd_nxt += 1;
                newseg.una = self.rcv_nxt;
                newseg.resendts = current;
                newseg.rto = self.rx_rto;
                newseg.fastack = 0;
                newseg.xmit = 0;
                self.snd_buf.push_back(newseg);
            } else {
                break;
            }
        }
        // calculate resent
        let resent = if self.fastresend > 0 {
            self.fastresend
        } else {
            u32::max_value()
        };
        let rtomin = if self.nodelay == 0 {
            self.rx_rto >> 3
        } else {
            0
        };

        // flush data segments
        for segment in &mut self.snd_buf {
            let mut needsend = false;
            if segment.xmit == 0 {
                needsend = true;
                segment.xmit += 1;
                segment.rto = self.rx_rto;
                segment.resendts = current + segment.rto + rtomin;
            } else if timediff(current, segment.resendts) >= 0 {
                needsend = true;
                segment.xmit += 1;
                self.xmit += 1;
                if self.nodelay == 0 {
                    segment.rto += self.rx_rto;
                } else {
                    segment.rto += self.rx_rto / 2;
                }
                segment.resendts = current + segment.rto;
                lost = true;
            } else if segment.fastack >= resent {
                needsend = true;
                segment.xmit += 1;
                segment.fastack = 0;
                segment.resendts = current + segment.rto;
                change = true;
            }

            if needsend {
                segment.ts = current;
                segment.wnd = seg.wnd;
                segment.una = self.rcv_nxt;

                let len = segment.data.len();
                let need = KCP_OVERHEAD + len;

                if self.buffer.remaining_mut() + need > self.mtu {
                    self.output.write_all(&self.buffer);
                    self.buffer.clear();
                }
                segment.encode(&mut self.buffer);

                // never used
                // if segment.xmit >= self.dead_link {
                //     self.state = -1;
                // }
            }
        }

        // flash remain segments
        if self.buffer.remaining_mut() > 0 {
            self.output.write_all(&self.buffer);
            self.buffer.clear();
        }

        // update ssthresh
        if change {
            let inflight = self.snd_nxt - self.snd_una;
            self.ssthresh = inflight / 2;
            if self.ssthresh < KCP_THRESH_MIN {
                self.ssthresh = KCP_THRESH_MIN;
            }
            self.cwnd = self.ssthresh + resent;
            self.incr = self.cwnd * self.mss as u32;
        }

        if lost {
            self.ssthresh = cwnd / 2;
            if self.ssthresh < KCP_THRESH_MIN {
                self.ssthresh = KCP_THRESH_MIN;
            }
            self.cwnd = 1;
            self.incr = self.mss as u32;
        }

        if self.cwnd < 1 {
            self.cwnd = 1;
            self.incr = self.mss as u32;
        }
    }

    /// update state (call it repeatedly, every 10ms-100ms), or you can ask
    /// `check` when to call it again (without `input`/`send` calling).
    /// `current` - current timestamp in millisec.
    pub fn update(&mut self, current: u32) {
        self.current = current;
        if !self.updated {
            self.updated = true;
            self.ts_flush = self.current;
        }
        let mut slap = timediff(self.current, self.ts_flush);

        if slap >= 10000 || slap < -10000 {
            self.ts_flush = self.current;
            slap = 0;
        }

        if slap >= 0 {
            self.ts_flush += self.interval;
            if timediff(self.current, self.ts_flush) >= 0 {
                self.ts_flush = self.current + self.interval;
            }
            self.flush();
        }
    }

    /// Determine when should you invoke `update`:
    /// returns when you should invoke `update` in millisec, if there
    /// is no `input`/`send` calling. you can call `update` in that
    /// time, instead of call `update` repeatly.
    /// Important to reduce unnacessary `update` invoking. use it to
    /// schedule `update` (eg. implementing an epoll-like mechanism,
    /// or optimize `update` when handling massive kcp connections)
    pub fn check(&self, current: u32) -> u32 {
        if !self.updated {
            return 0;
        }

        let mut ts_flush = self.ts_flush;
        let mut tm_packet = u32::max_value();

        if timediff(current, ts_flush) >= 10000 || timediff(current, ts_flush) < -10000 {
            ts_flush = current;
        }

        if timediff(current, ts_flush) >= 0 {
            return 0;
        }

        let tm_flush = timediff(ts_flush, current) as u32;
        for seg in &self.snd_buf {
            let diff = timediff(seg.resendts, current);
            if diff <= 0 {
                return 0;
            }
            if (diff as u32) < tm_packet {
                tm_packet = diff as u32;
            }
        }

        let minimal = cmp::min(cmp::min(tm_packet, tm_flush), self.interval);

        minimal
    }

    /// change MTU size, default is 1400
    pub fn setmtu(&mut self, mtu: usize) -> bool {
        if mtu < 50 || mtu < KCP_OVERHEAD {
            return false;
        }
        self.mtu = mtu;
        self.mss = self.mtu - KCP_OVERHEAD;
        let additional = (mtu + KCP_OVERHEAD) * 3 - self.buffer.capacity();
        if additional > 0 {
            self.buffer.reserve(additional);
        }
        true
    }

    /// fastest: nodelay(1, 20, 2, 1)
    /// `nodelay`: 0:disable(default), 1:enable
    /// `interval`: internal update timer interval in millisec, default is 100ms
    /// `resend`: 0:disable fast resend(default), 1:enable fast resend
    /// `nc`: false:normal congestion control(default), true:disable congestion control
    pub fn nodelay(&mut self, nodelay: i32, interval: i32, resend: i32, nc: bool) {
        if nodelay >= 0 {
            let nodelay = nodelay as u32;
            self.nodelay = nodelay;
            if nodelay > 0 {
                self.rx_minrto = KCP_RTO_NDL;
            } else {
                self.rx_minrto = KCP_RTO_MIN;
            }
        }
        if interval >= 0 {
            let mut interval = interval as u32;
            if interval > 5000 {
                interval = 5000;
            } else if interval < 10 {
                interval = 10;
            }
            self.interval = interval;
        }
        if resend >= 0 {
            self.fastresend = resend as u32;
        }
        self.nocwnd = nc;
    }

    /// set maximum window size: `sndwnd`=32, `rcvwnd`=32 by default
    pub fn wndsize(&mut self, sndwnd: i32, rcvwnd: i32) {
        if sndwnd > 0 {
            self.snd_wnd = sndwnd as u32;
        }
        if rcvwnd > 0 {
            self.rcv_wnd = rcvwnd as u32;
        }
    }

    /// get how many packet is waiting to be sent
    pub fn waitsnd(&self) -> usize {
        self.snd_buf.len() + self.snd_queue.len()
    }
}

#[inline]
fn timediff(later: u32, earlier: u32) -> i32 {
    later as i32 - earlier as i32
}

#[inline]
fn bound(lower: u32, v: u32, upper: u32) -> u32 {
    cmp::min(cmp::max(lower, v), upper)
}

kcb.rs

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.