Server goes in uncontrolled cycles

I try to create server with tokio and tokio_io. Server should able to receive datagram in special format:
{some number: u64} {space: ' '} {some data: String}. So I created module codec.rs:

use tokio_io::codec::{Encoder, Decoder};
use bytes::{BytesMut, Bytes, BufMut};
use std::io;
use std::char;

pub struct MyData(pub u64, pub String);

#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct MyFrame;

impl Encoder for MyFrame {

    type Item = Bytes;
    type Error = io::Error;

    fn encode(
        &mut self,
        data: Self::Item,
        dst: &mut BytesMut
    ) -> Result<(), Self::Error> {

        dst.reserve(data.len());
        dst.put(data);

        Ok(())
    }
}

impl Decoder for MyFrame {

    type Item = MyData;
    type Error = io::Error;

    fn decode(
        &mut self,
        src: &mut BytesMut
    ) -> Result<Option<Self::Item>, Self::Error> {

        let mut delimeter: usize = 0;

        for &item in src.iter() {

            if item as char == ' ' {
                let (data1, data2) = src.split_at(delimeter);

                let info = String::from_utf8(data2.to_vec()).unwrap();
                let id = String::from_utf8(data1.to_vec()).unwrap().parse::<u64>().unwrap();

                return Ok(Some(MyData(id, info)));
            }

            delimeter += 1;
        }

        Ok(None)
    }
}

And here is my module server.rs:

use tokio_core::reactor::Core;
use tokio_io::codec::BytesCodec;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use bytes::{BytesMut, Bytes, BufMut};
use futures_cpupool::{CpuPool, CpuFuture};

use codec::{MyFrame, MyData};
use std::env;
use std::net::SocketAddr;
use std::str;
use std::io;
use std;

#[allow(dead_code)]
pub fn run() {

    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let pool = CpuPool::new_num_cpus();

    let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
    let addr = addr.parse::<SocketAddr>().unwrap();

    let socket = TcpListener::bind(&addr).unwrap();
    println!("Listening on: {}", addr);

    let server = socket
        .incoming()
        .map_err(|e| println!("failed to accept socket; error = {:?}", e))
        .for_each( move |socket| {

            handle.spawn(work(socket, &pool).then(|_result| {
                Ok(())
            }));

            Ok(())
        });

    core.run(server).unwrap();
}

fn work(socket: TcpStream, pool: &CpuPool)
        -> Box< CpuFuture<(), io::Error> > {
    println!("New connection from {}", socket.peer_addr().unwrap());

//    let framed = socket.framed(BytesCodec::new());
    let framed = socket.framed(MyFrame::new());
    let (_writer, reader) = framed.split();

    let proc1 = |frame: BytesMut| -> Result<(), io::Error> {
        match str::from_utf8(&frame[..]) {
            Ok(data) => {
                print!("{}", data);

                Ok(())
            }
            Err(_) => Ok(()),
        }
    };

    let proc2 = |frame: MyData| -> Result<(), io::Error> {
        println!("{}, {}", frame.0, frame.1);
        Ok(())
    };

    let processor = reader
        .for_each(proc2)
        .and_then(|()| {
            println!("Socket received FIN packet and closed connection");
            Ok(())
        })
        .or_else(|err| {
            println!("Socket closed with error: {:?}", err);
            Err(err)
        })
        .then(|result| {
            println!("Socket closed with result: {:?}", result);
            Ok(())
        });

    Box::new(pool.spawn(processor))
}

Then I start to run my server and connect to it with telnet. But when I send some line with view, for example:
123 qwerty
I see how my server receives this line and print it going in cycles. It happens when I use my closure proc2 and MyFrame. But when I use closure proc1 and BytesCodec server works properly and print the received line only when I realy send it with telnet without going in cycles.

I don't understand why using the handler proc2 and MyFrame leads to server goes to print line in cycles... Could anybody help me to understand why it happens?

I solved my problem by changing method decode(...):

fn decode(
        &mut self,
        src: &mut BytesMut
    ) -> Result<Option<Self::Item>, Self::Error> {

        let len = src.len();

        if len > 0 {
            let mut delimeter: usize = 0;

            let src = src.split_to(len);

            for &item in src.iter() {

                if item as char == ' ' {
                    let (data1, data2) = src.split_at(delimeter);

                    let info = String::from_utf8(data2.to_vec()).unwrap();

                    match String::from_utf8(data1.to_vec()).unwrap().parse::<u64>() {
                        Ok(id) => return Ok(Some(MyData(id, info))),
                        Err(err) => return Ok(None),
                    }
                }

                delimeter += 1;
            }
            Ok(None)
        } else {
            Ok(None)
        }
    }

I didn't understand why it helps, but the idea was taken from BytesCodec.

The BytesMut needs to be consumed/advanced when you parse a frame out of it. split_at is a method on a slice, and the BytesMut doesn’t know that you’ve actually parsed the frame out because this operation is not visible to it (you’re doing it on a slice that it derefs to).

BytesMut::split_to, however, is a method on BytesMut and advances its internal indices. As such, it knows you’ve consumed the data from it up to the split point. As such, tokio will only call you again when more bytes are available. In the split_at case, it just keeps telling you it has the same bytes over and over because it doesn’t know you’ve parsed them out already.

4 Likes