A stream style communication from a datagram style communication

Given a pair of sender and receiver of byte "datagrams" (messages have clear boundaries):

receiver: Stream<Item=Vec<u8>, Error=()>
sender: Sink<SinkItem=Vec<u8>, SinkError=()>

I want to create a stream like communication (like TcpStream), where messages don't have clear boundaries.
In other words, I want to get the inverse operation to TokioCodec::Decoder::framed.

My main questions are:

  1. Do you know of some function inside Tokio, or even in an external lib that does what I'm trying to do?
  2. Any good ideas about how to implement this.

I realized that I should probably implement std::io::Read and std::io::Write for some structure that contains both receiver and sender, and hopefully AsyncRead and AsyncWrite will be automatically implemented to this struct.

I began writing a short piece of code attempting to demonstrate what I'm trying to do (This code only deals with receiving data):

use std::{io, cmp};
use futures::sync::mpsc;
use futures::{Async, Stream};


struct StreamReceiver<M> {
    opt_receiver: Option<M>,
    pending_in: Vec<u8>,
}

impl<M> StreamReceiver<M> {
    fn new(receiver: M) -> Self {
        StreamReceiver {
            opt_receiver: Some(receiver),
            pending_in: Vec::new(),
        }
    }
}

impl<M> io::Read for StreamReceiver<M> 
where
    M: Stream<Item=Vec<u8>, Error=()>,
{
    fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
        let mut total_read = 0; // Total amount of bytes read
        loop {
            // pending_in --> buf (As many bytes as possible)
            let min_len = cmp::min(buf.len(), self.pending_in.len());
            buf[.. min_len].copy_from_slice(&self.pending_in[.. min_len]);
            let _ = self.pending_in.drain(.. min_len);
            buf = &mut buf[min_len ..];
            total_read += min_len;

            if self.pending_in.len() > 0 || buf.len() == 0 {
                return Ok(total_read);
            }

            match self.opt_receiver.take() {
                Some(mut receiver) => {
                    match receiver.poll() {
                        Ok(Async::Ready(Some(data))) => {
                            self.opt_receiver = Some(receiver);
                            self.pending_in = data;
                        },
                        Ok(Async::Ready(None)) => return Ok(total_read), // End of incoming data
                        Ok(Async::NotReady) => {
                            self.opt_receiver = Some(receiver);
                            if total_read > 0 {
                                return Ok(total_read)
                            } else {
                                return Err(io::Error::new(io::ErrorKind::WouldBlock, "WouldBlock"))
                            }
                        },
                        Err(()) => return Err(io::Error::new(io::ErrorKind::BrokenPipe, "BrokenPipe")),
                    };
                },
                None => return Ok(total_read),
            }
        }
    }
}

Some notes about the code:

  • This code compiles, but I haven't tested it, so it might be incorrect in some cases.
  • I am worried that I am mixing here things from the two worlds: The Async world (Futures) and the synchronous world of std::io. Maybe something is fundamentally wrong here.
  • I'm not yet sure what would be the right way to test this kind of code.
  • I am aware that things could probably be more efficient with Bytes instead of Vec, but I know I can optimize things later.

Any help is appreciated!
real.

Have you considered futures::stream::Stream - Rust?

Hi @vitalyd, I know about forward() but I think that it will not solve my problem here.
I should probably explain the bigger picture of what I'm trying to do.

I wrote an encryption/decryption layer.
I begin from a TCP connection: TcpStream. Then I use the framed() method together with an Encoder/Decoder to split it into frames. This gives a pair of (reader, writer) of Vec<u8>.
Then the encryption code takes the (reader, writer) pair and returns a new pair: (enc_reader, enc_writer) of Vec<u8>.

Picture:

TcpStream --[framed()]--> (reader, writer) --[encryption]--> (enc_reader, enc_writer)

I am missing this final step:

 (enc_reader, enc_writer) ---[???]--> EncryptedTcpStream

Where EncryptedTcpStream has the same interface as TcpStream (Implements both AsyncRead and AsyncWrite).

Why do I want to go back to EncryptedTcpStream? Because the encryption process limits the maximum size of a frame. For example, the encryption protocol sends data in frames of size 0x100. If I ever want to send a frame of size larger than 0x100, I won't be able to do it using enc_writer.

I see. I think I get the gist now.

It seems like you can have a custom Encoder impl that does the buffering for you if a block is larger than, eg, 0x100. In other words, the encoder’s output is a Frame256 (I’m just picking a name), which is a wrapper over 256 bytes. Then the encoded stream is a Sink that takes Frame256 types. The encoded stream is forwarded Frame256 values from your underlying Vec stream with this buffering encoded put in the middle.

Before going further, does that sound sensible or am I still missing something crucial?

You could take a look at https://github.com/Nemo157/libp2p-secio-rs/blob/master/src/secstream.rs, I haven’t touched this code in a long time so it may be out of date with latest tokio best practices, but it is doing exactly the same sort of layering you’re describing.

1 Like

@Nemo157: I think that this is exactly what I was looking for. The code at https://github.com/Nemo157/libp2p-secio-rs/blob/master/src/secstream.rs#L58 really looks like the code I posted above. Thanks for that!

@vitalyd: My main issue was how to implement Write, Read, AsyncWrite and AsyncRead correctly. I was not sure if it is reasonable to mix futures code with std::io::Write and std::io::Read implementation.
I agree with you about the idea of buffering. This is basically what I was trying to do with the code I sent. I will apply the idea you suggested for the implementation of std::io::Write.