Need help to complete simple echo server by using future’s stream and sink

Hello,
I'm learning futures, and I did set a small project that is an echo server by using Stream and Sink for better understanding.
Unfortunately, I'm confused about Sink, and I couldn't find a good example/tutorial on the internet.
I don't know how Sink must be handled and It's lifetime.
Is better solution for tx.send(msg).wait(); (an asynchronous way) ?
Could please someone helping me to finish correctly this small project, I think there are many users like me that confused on Sink.

use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio::sync::mpsc;

fn main() {
    let addr = "127.0.0.1:12345".parse().unwrap();
    let listener = TcpListener::bind(&addr).unwrap();

    let server = listener
        .incoming()
        .for_each(move |socket| {
            let (command_tx, command_rx) = mpsc::channel::<Vec<u8>>(1000);
            let sh = Echo::new(socket);
            let (network_sender, network_receiver) = sh.split();
            let receiver_future = network_receiver.for_each(move |msg| {
                let tx = command_tx.clone();
                tx.send(msg).wait();
                Ok(())
            });

            let client_to_tcp = command_rx
                .map_err(|_| ())
                .and_then(|p| {
                    println!("{:?}", p);
                    Ok(p)
                })
                .forward(network_sender)
                .then(|_| Ok(()));

            tokio::spawn(
                receiver_future
                    .select(client_to_tcp)
                    .map(|_| ())
                    .map_err(|_| ()),
            );

            Ok(())
        })
        .map_err(|_| ());
    tokio::run(server);
}

pub struct Echo {
    server_reader: Option<tokio::io::ReadHalf<tokio::net::TcpStream>>,
    server_writer: Option<tokio::io::WriteHalf<tokio::net::TcpStream>>,
    read_buf: Box<[u8]>,
}

impl Echo {
    pub fn new(server: tokio::net::TcpStream) -> Echo {
        let (sr, sw) = server.split();
        Echo {
            server_reader: Some(sr),
            server_writer: Some(sw),
            read_buf: Box::new([0; 2048]),
        }
    }
}

impl Stream for Echo {
    type Item = Vec<u8>;
    type Error = ();
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        let src = self.server_reader.as_mut().unwrap();
        match src.poll_read(&mut self.read_buf).map_err(|_| ()) {
            Ok(futures::Async::Ready(t)) => {
                if t == 0 {
                    Ok(futures::Async::Ready(None))
                } else {
                    Ok(futures::Async::Ready(Some(
                        self.read_buf[..t].to_vec(),
                    )))
                }
            }
            Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady),
            Err(e) => Err(From::from(e)),
        }
    }
}

impl Sink for Echo {
    type SinkItem = Vec<u8>;
    type SinkError = ();
    fn start_send(
        &mut self,
        item: Self::SinkItem,
    ) -> futures::StartSend<Self::SinkItem, Self::SinkError> {
        dbg!("test");
        unimplemented!()
    }
    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
        Ok(futures::Async::NotReady)
    }
}
[package]
name = "echo_example"
version = "0.0.1"
authors = ["blah <blah@mail.com>"]
edition = "2018"

[dependencies]
futures = "0.1"
tokio = "0.1"

Many thanks.

This one should implement Sink. So normally you can look a the required methods in the documentation for Sink and just forward all the calls to this one.

1 Like

Oops, sorry, I answered to fast. So you just have a TcpStream. Normally that doesn't implement stream and sink, because those last ones tend to work on messages. You could make a stream and sink on item type u8, but that would be a bit bizarre. So if you have a raw bytestream, you would usually work with Write and AsyncWrite traits. If you want to send messages, you would use a tokio Codec to "frame" the stream. Tokio normally has examples of how to do that, but it comes down to (sink, stream) = codec.framed( stream ).split();

The framed halves implement sink and stream. You could then easily make an echo server by forwarding the calles to the inner object. Otherwise if this is for learning purposes you can make a Sink/Stream on u8. Just look at the required methods and try to implement them. I'm not quite sure what's the status of TcpStream for poll_ready... and flush.

1 Like

Thank you for your valuable response :pray:t2:.
First of all I prefer complete simple example with Sink/Stream on u8 and after that I will try framed.
I fact I did complete this example but I have two question on it.

use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio::sync::mpsc;

fn main() {
    let addr = "127.0.0.1:12345".parse().unwrap();
    let listener = TcpListener::bind(&addr).unwrap();

    let server = listener
        .incoming()
        .for_each(move |socket| {
            let (command_tx, command_rx) = mpsc::channel::<Vec<u8>>(1000);
            let sh = Echo::new(socket);
            let (network_sender, network_receiver) = sh.split();
            let receiver_future = network_receiver.for_each(move |msg| {
                let tx = command_tx.clone();
                tx.send(msg).wait(); // Question 1 : is wait good option for this case?
                Ok(())
            });

            let client_to_tcp = command_rx
                .map_err(|_| ())
                .and_then(|p| {
                    println!("{:?}", p);
                    Ok(p)
                })
                .forward(network_sender)
                .then(|_| Ok(()));

            tokio::spawn(
                receiver_future
                    .select(client_to_tcp)
                    .map(|_| ())
                    .map_err(|_| ()),
            );

            Ok(())
        })
        .map_err(|_| ());
    tokio::run(server);
}

pub struct Echo {
    server_reader: Option<tokio::io::ReadHalf<tokio::net::TcpStream>>,
    server_writer: Option<tokio::io::WriteHalf<tokio::net::TcpStream>>,
    read_buf: Box<[u8]>,
}

impl Echo {
    pub fn new(server: tokio::net::TcpStream) -> Echo {
        let (sr, sw) = server.split();
        Echo {
            server_reader: Some(sr),
            server_writer: Some(sw),
            read_buf: Box::new([0; 2048]),
        }
    }
}

impl Stream for Echo {
    type Item = Vec<u8>;
    type Error = ();
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        let src = self.server_reader.as_mut().unwrap();
        match src.poll_read(&mut self.read_buf).map_err(|_| ()) {
            Ok(futures::Async::Ready(t)) => {
                if t == 0 {
                    Ok(futures::Async::Ready(None))
                } else {
                    Ok(futures::Async::Ready(Some(self.read_buf[..t].to_vec())))
                }
            }
            Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady),
            Err(e) => Err(From::from(e)),
        }
    }
}

impl Sink for Echo {
    type SinkItem = Vec<u8>;
    type SinkError = ();
    fn start_send(
        &mut self,
        item: Self::SinkItem,
    ) -> futures::StartSend<Self::SinkItem, Self::SinkError> {
        let writer = self.server_writer.as_mut().unwrap();
        match writer.poll_write(&item.clone()) {
            Ok(futures::Async::Ready(t)) => t,
            Ok(futures::Async::NotReady) => return Ok(futures::AsyncSink::NotReady(item)), // forget it's problems, it must be handle correctly
            Err(_) => return Err(()),
        };
        Ok(futures::AsyncSink::Ready)
    }
    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { // Question 2 : it this case ! how i must handle poll_complete ?
        Ok(futures::Async::Ready(())) 
    }
}

I did mentioned my questions in the code.
Farther questions :

  1. How can I close connection manually ?
  2. What problems are exists in the above code ?

wait will block, which is usually to be avoided in async code... That's why for_each method will take a closure that produces a future. This way, you don't need to block. tx.send(msg) will be just fine, because the Send it returns implements Future.

I'll have a look at your second question.

1 Like

The docs of poll_complete seem quite clear to me. If there is buffering going on, you have to flush now. That means you should look at the docs of your TcpStream that you are wrapping to see if there is any methods you can call to tell it to flush. I don't think there are any, but I'll leave it to you to check the docs. In that case, if you don't buffer, just return ready.

For the additional question, I don't have time right now to do a full review... sorry.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.