Given a pair of sender and receiver of byte "datagrams" (messages have clear boundaries):
receiver: Stream<Item=Vec<u8>, Error=()>
sender: Sink<SinkItem=Vec<u8>, SinkError=()>
I want to create a stream like communication (like TcpStream), where messages don't have clear boundaries.
In other words, I want to get the inverse operation to TokioCodec::Decoder::framed.
My main questions are:
- Do you know of some function inside Tokio, or even in an external lib that does what I'm trying to do?
- Any good ideas about how to implement this.
I realized that I should probably implement std::io::Read
and std::io::Write
for some structure that contains both receiver
and sender
, and hopefully AsyncRead and AsyncWrite will be automatically implemented to this struct.
I began writing a short piece of code attempting to demonstrate what I'm trying to do (This code only deals with receiving data):
use std::{io, cmp};
use futures::sync::mpsc;
use futures::{Async, Stream};
struct StreamReceiver<M> {
opt_receiver: Option<M>,
pending_in: Vec<u8>,
}
impl<M> StreamReceiver<M> {
fn new(receiver: M) -> Self {
StreamReceiver {
opt_receiver: Some(receiver),
pending_in: Vec::new(),
}
}
}
impl<M> io::Read for StreamReceiver<M>
where
M: Stream<Item=Vec<u8>, Error=()>,
{
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
let mut total_read = 0; // Total amount of bytes read
loop {
// pending_in --> buf (As many bytes as possible)
let min_len = cmp::min(buf.len(), self.pending_in.len());
buf[.. min_len].copy_from_slice(&self.pending_in[.. min_len]);
let _ = self.pending_in.drain(.. min_len);
buf = &mut buf[min_len ..];
total_read += min_len;
if self.pending_in.len() > 0 || buf.len() == 0 {
return Ok(total_read);
}
match self.opt_receiver.take() {
Some(mut receiver) => {
match receiver.poll() {
Ok(Async::Ready(Some(data))) => {
self.opt_receiver = Some(receiver);
self.pending_in = data;
},
Ok(Async::Ready(None)) => return Ok(total_read), // End of incoming data
Ok(Async::NotReady) => {
self.opt_receiver = Some(receiver);
if total_read > 0 {
return Ok(total_read)
} else {
return Err(io::Error::new(io::ErrorKind::WouldBlock, "WouldBlock"))
}
},
Err(()) => return Err(io::Error::new(io::ErrorKind::BrokenPipe, "BrokenPipe")),
};
},
None => return Ok(total_read),
}
}
}
}
Some notes about the code:
- This code compiles, but I haven't tested it, so it might be incorrect in some cases.
- I am worried that I am mixing here things from the two worlds: The Async world (Futures) and the synchronous world of
std::io
. Maybe something is fundamentally wrong here. - I'm not yet sure what would be the right way to test this kind of code.
- I am aware that things could probably be more efficient with Bytes instead of Vec, but I know I can optimize things later.
Any help is appreciated!
real.