I try to create server with tokio
and tokio_io
. Server should able to receive datagram in special format:
{some number: u64} {space: ' '} {some data: String}. So I created module codec.rs
:
use tokio_io::codec::{Encoder, Decoder};
use bytes::{BytesMut, Bytes, BufMut};
use std::io;
use std::char;
pub struct MyData(pub u64, pub String);
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct MyFrame;
impl Encoder for MyFrame {
type Item = Bytes;
type Error = io::Error;
fn encode(
&mut self,
data: Self::Item,
dst: &mut BytesMut
) -> Result<(), Self::Error> {
dst.reserve(data.len());
dst.put(data);
Ok(())
}
}
impl Decoder for MyFrame {
type Item = MyData;
type Error = io::Error;
fn decode(
&mut self,
src: &mut BytesMut
) -> Result<Option<Self::Item>, Self::Error> {
let mut delimeter: usize = 0;
for &item in src.iter() {
if item as char == ' ' {
let (data1, data2) = src.split_at(delimeter);
let info = String::from_utf8(data2.to_vec()).unwrap();
let id = String::from_utf8(data1.to_vec()).unwrap().parse::<u64>().unwrap();
return Ok(Some(MyData(id, info)));
}
delimeter += 1;
}
Ok(None)
}
}
And here is my module server.rs
:
use tokio_core::reactor::Core;
use tokio_io::codec::BytesCodec;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use bytes::{BytesMut, Bytes, BufMut};
use futures_cpupool::{CpuPool, CpuFuture};
use codec::{MyFrame, MyData};
use std::env;
use std::net::SocketAddr;
use std::str;
use std::io;
use std;
#[allow(dead_code)]
pub fn run() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let pool = CpuPool::new_num_cpus();
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let socket = TcpListener::bind(&addr).unwrap();
println!("Listening on: {}", addr);
let server = socket
.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each( move |socket| {
handle.spawn(work(socket, &pool).then(|_result| {
Ok(())
}));
Ok(())
});
core.run(server).unwrap();
}
fn work(socket: TcpStream, pool: &CpuPool)
-> Box< CpuFuture<(), io::Error> > {
println!("New connection from {}", socket.peer_addr().unwrap());
// let framed = socket.framed(BytesCodec::new());
let framed = socket.framed(MyFrame::new());
let (_writer, reader) = framed.split();
let proc1 = |frame: BytesMut| -> Result<(), io::Error> {
match str::from_utf8(&frame[..]) {
Ok(data) => {
print!("{}", data);
Ok(())
}
Err(_) => Ok(()),
}
};
let proc2 = |frame: MyData| -> Result<(), io::Error> {
println!("{}, {}", frame.0, frame.1);
Ok(())
};
let processor = reader
.for_each(proc2)
.and_then(|()| {
println!("Socket received FIN packet and closed connection");
Ok(())
})
.or_else(|err| {
println!("Socket closed with error: {:?}", err);
Err(err)
})
.then(|result| {
println!("Socket closed with result: {:?}", result);
Ok(())
});
Box::new(pool.spawn(processor))
}
Then I start to run my server and connect to it with telnet
. But when I send some line with view, for example:
123 qwerty
I see how my server receives this line and print it going in cycles. It happens when I use my closure proc2
and MyFrame
. But when I use closure proc1
and BytesCodec
server works properly and print the received line only when I realy send it with telnet
without going in cycles.
I don't understand why using the handler proc2
and MyFrame
leads to server goes to print line in cycles... Could anybody help me to understand why it happens?