Read hyper::Body async. line by line (aka. my first Stream impl)

I have, recently, been learning a lot about Futures and Streams, a lot about async, and a lot about the hyper crate.

My micro-service (powered by Rust & Hyper) receives POST-data in a plain-text format that is processed line by line. Using tokio and hyper and futures-preview in their alpha versions, I can build a fully async service. hyper::Body is basically a stream of hyper::Chunks, themselves being basically byte slices.

I immediately perceived the opportunity to try and write my own Stream wrapper that would consume the hyper::Body and be a Stream of Vec<u8> but, instead of returning each chunk, it would buffer the chunks and return the body's contents line by line, splitting it on '\n' (or "\r\n" for compatibility)

I started from the source code of futures_util::stream::Chunks because that seemed somewhat similar to what I was trying to do and I came up with this:

use core::pin::Pin;
use std::{
    vec::Vec,
    collections::VecDeque,
};
use futures::{
    stream::{ Stream, FusedStream },
    task::{ Context, Poll },
    ready,
};
use hyper::{ Result, Chunk };

const NEW_LINE: u8 = b'\n';
const CARRIAGE_RETURN: u8 = b'\r';

pub(super) struct BodyStream<St: Stream<Item = Result<Chunk>> + Unpin> {
    stream: St,
    terminated: bool,
    buffer: VecDeque<u8>,
}

impl<St: Stream<Item = Result<Chunk>> + Unpin> BodyStream<St> {
    pub(super) fn new(stream: St) -> Self {
        Self {
            stream,
            terminated: false,
            buffer: VecDeque::new(),
        }
    }

    fn stream(self: Pin<&mut Self>) -> Pin<&mut St> {
        unsafe { Pin::map_unchecked_mut(self, |s| { &mut s.stream }) }
    }

    fn take_line(mut self: Pin<&mut Self>) -> Option<Vec<u8>> {
        if !self.buffer.is_empty() {
            if let Some(i) = self.buffer.iter().position(|b| &NEW_LINE == b) {
                let trim: usize = if (i > 0) && (&CARRIAGE_RETURN == &self.buffer[i - 1]) { 1 } else { 0 };
                return Some(self.as_mut().buffer.drain(..= i).take(i - trim).collect::<Vec<_>>());
            }
        }

        None
    }
}

impl<St: Stream<Item = Result<Chunk>> + Unpin> Stream for BodyStream<St> {
    type Item = Result<Vec<u8>>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // Return immediately if a whole line is already buffered.
        if let Some(line) = self.as_mut().take_line() {
            return Poll::Ready(Some(Ok(line)));
        }

        // Continuously poll the source stream if it is not terminated.
        while !self.terminated {
            match ready!(self.as_mut().stream().poll_next(cx)) {
                // If a chunk is yielded by the source stream, append the chunk to the buffer
                // and scan the buffer to see if it contains a complete line.
                Some(Ok(chunk)) => {
                    self.as_mut().buffer.extend(chunk.into_bytes());
                    if let Some(line) = self.as_mut().take_line() {
                        return Poll::Ready(Some(Ok(line)));
                    }
                }

                // If an error is yielded by the source stream, pass it on.
                Some(Err(error)) => {
                    return Poll::Ready(Some(Err(error)))
                }

                // If the end of the source stream is reached, raise the `terminated` flag
                // and break the loop.
                None => {
                    self.terminated = true;
                    break
                }
            }
        }

        // Yield the remaining contents of the buffer, if any.
        if !self.buffer.is_empty() {
            let line = self.as_mut().buffer.drain(..).collect::<Vec<_>>();
            return Poll::Ready(Some(Ok(line)));
        }

        // The source stream has terminated and the buffer is empty.
        Poll::Ready(None)
    }
}

impl<St: Stream<Item = Result<Chunk>> + Unpin> FusedStream for BodyStream<St> {
    fn is_terminated(&self) -> bool {
        self.terminated && self.buffer.is_empty()
    }
}

It works.

But could it be better? This is my first implementation of Stream and FusedStream so I have no idea if I've done this perfectly or not. Please give me a code-review!

Specifically:

  • I'm not certain that my unsafe block in stream() is safe in this case because I don't fully understand Pin and Unpin, yet. I'm getting closer but not there, yet. (I know I could use pin-utils for forwarding stream() but I wanted to do stuff manually to learn what's going on.)
  • I don't know if my use of the VecDeque is optimal. Specifically, my use of drain() and collect() to convert the first n elements into a vector. I know about as_slices() and could build the Vec from those with what would amount to one or two contiguous copy operations (i.e. fast and O(1)) but then I'd need something like truncate_front() to actually remove the copied items from the VecDeque
  • I somehow hope that my use of drain(), take() and collect() are reduced to contiguous copy operations, anyway. How smart are Rust's standard collections and iterator adaptors like these?
  • General comments on style, practicality, safety and idiomatic-ness are always welcomed.

Note: this code should run exactly as-is but requires...

[dependencies]
futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }
hyper = { version="0.13.0-alpha.4", features=["unstable-stream"] }

Because you're requiring St: Unpin you shouldn't need any unsafe, you can just ignore pinning entirely and use Pin::new where needed to call other methods. If you want to properly project a pin of your type to a pin of a field I would strongly recommend pin-project for that, it provides a safe abstraction so that you can't make a mistake.

I'm not sure about the VecDeque usage, I never got around to trying it out for anything similar to this. I do know that using memchr can provide a very significant speed boost for scanning for the end-of-line (even for just a 16 character line I've encountered a 7% speedup in the past).

Rather than using iterator adaptors like that I would just go for Vec::split_off (or VecDeque::split_off maybe) since that's guaranteed to be a simple copy, and can often result in more readable code for simple splitting like this.

Personally I would probably use futures::stream::IntoAsyncRead to convert the stream-of-chunks back into a byte-stream, then futures_codec with a simple line-based codec to convert the byte-stream into a stream-of-lines. That would probably sacrifice some performance as it won't be copying exactly a chunk at a time from the underlying stream, but probably not enough to be significant (and futures_codec may some day get support for AsyncBufRead which would mitigate that).

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.