I have, recently, been learning a lot about Futures and Streams, a lot about async
, and a lot about the hyper
crate.
My micro-service (powered by Rust & Hyper) receives POST-data in a plain-text format that is processed line by line. Using tokio
and hyper
and futures-preview
in their alpha versions, I can build a fully async
service. hyper::Body
is basically a stream of hyper::Chunk
s, themselves being basically byte slices.
I immediately perceived the opportunity to try and write my own Stream
wrapper that would consume the hyper::Body
and be a Stream of Vec<u8>
but, instead of returning each chunk, it would buffer the chunks and return the body's contents line by line, splitting it on '\n'
(or "\r\n"
for compatibility)
I started from the source code of futures_util::stream::Chunks
because that seemed somewhat similar to what I was trying to do and I came up with this:
use core::pin::Pin;
use std::{
vec::Vec,
collections::VecDeque,
};
use futures::{
stream::{ Stream, FusedStream },
task::{ Context, Poll },
ready,
};
use hyper::{ Result, Chunk };
const NEW_LINE: u8 = b'\n';
const CARRIAGE_RETURN: u8 = b'\r';
pub(super) struct BodyStream<St: Stream<Item = Result<Chunk>> + Unpin> {
stream: St,
terminated: bool,
buffer: VecDeque<u8>,
}
impl<St: Stream<Item = Result<Chunk>> + Unpin> BodyStream<St> {
pub(super) fn new(stream: St) -> Self {
Self {
stream,
terminated: false,
buffer: VecDeque::new(),
}
}
fn stream(self: Pin<&mut Self>) -> Pin<&mut St> {
unsafe { Pin::map_unchecked_mut(self, |s| { &mut s.stream }) }
}
fn take_line(mut self: Pin<&mut Self>) -> Option<Vec<u8>> {
if !self.buffer.is_empty() {
if let Some(i) = self.buffer.iter().position(|b| &NEW_LINE == b) {
let trim: usize = if (i > 0) && (&CARRIAGE_RETURN == &self.buffer[i - 1]) { 1 } else { 0 };
return Some(self.as_mut().buffer.drain(..= i).take(i - trim).collect::<Vec<_>>());
}
}
None
}
}
impl<St: Stream<Item = Result<Chunk>> + Unpin> Stream for BodyStream<St> {
type Item = Result<Vec<u8>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Return immediately if a whole line is already buffered.
if let Some(line) = self.as_mut().take_line() {
return Poll::Ready(Some(Ok(line)));
}
// Continuously poll the source stream if it is not terminated.
while !self.terminated {
match ready!(self.as_mut().stream().poll_next(cx)) {
// If a chunk is yielded by the source stream, append the chunk to the buffer
// and scan the buffer to see if it contains a complete line.
Some(Ok(chunk)) => {
self.as_mut().buffer.extend(chunk.into_bytes());
if let Some(line) = self.as_mut().take_line() {
return Poll::Ready(Some(Ok(line)));
}
}
// If an error is yielded by the source stream, pass it on.
Some(Err(error)) => {
return Poll::Ready(Some(Err(error)))
}
// If the end of the source stream is reached, raise the `terminated` flag
// and break the loop.
None => {
self.terminated = true;
break
}
}
}
// Yield the remaining contents of the buffer, if any.
if !self.buffer.is_empty() {
let line = self.as_mut().buffer.drain(..).collect::<Vec<_>>();
return Poll::Ready(Some(Ok(line)));
}
// The source stream has terminated and the buffer is empty.
Poll::Ready(None)
}
}
impl<St: Stream<Item = Result<Chunk>> + Unpin> FusedStream for BodyStream<St> {
fn is_terminated(&self) -> bool {
self.terminated && self.buffer.is_empty()
}
}
It works.
But could it be better? This is my first implementation of Stream
and FusedStream
so I have no idea if I've done this perfectly or not. Please give me a code-review!
Specifically:
- I'm not certain that my
unsafe
block instream()
is safe in this case because I don't fully understandPin
andUnpin
, yet. I'm getting closer but not there, yet. (I know I could usepin-utils
for forwardingstream()
but I wanted to do stuff manually to learn what's going on.) - I don't know if my use of the
VecDeque
is optimal. Specifically, my use ofdrain()
andcollect()
to convert the firstn
elements into a vector. I know aboutas_slices()
and could build theVec
from those with what would amount to one or two contiguous copy operations (i.e. fast and O(1)) but then I'd need something liketruncate_front()
to actually remove the copied items from theVecDeque
- I somehow hope that my use of
drain()
,take()
andcollect()
are reduced to contiguous copy operations, anyway. How smart are Rust's standard collections and iterator adaptors like these? - General comments on style, practicality, safety and idiomatic-ness are always welcomed.
Note: this code should run exactly as-is but requires...
[dependencies]
futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }
hyper = { version="0.13.0-alpha.4", features=["unstable-stream"] }