Help with streaming futures in actix-web

Cross-posted from Reddit; apologies if that’s poor etiquette

TL;DR: I’m looking for a good resource on working with Futures/Streams.

Long version: I’m just in the process of falling in love with Rust, and working on my first real project, which is intended to sit between systems and an InfluxDB instance, and modify or re-route the posted data.

The InfluxDB line protocol allows up to 10K metrics in a single post, delimited by \n. I’ve successfully created a nom parser to parse these lines into structs consisting of &[u8] slices, which made me feel very clever.

So now I’ve got to hook this up to a web server, and I’m looking at actix.rs for that. It provides the request body as a streaming future (is that the right term?), and the docs and examples show using .concat2() or bytes::BytesMut to read all the bytes into memory. But given that the data may be pretty big, I’d like to process it as a stream, reading a chunk at a time, checking for a \n character in what’s been read so far, and trying to parse the data up to that point.

Problem is, I haven’t been able to find any good documentation on working with Streams. For example, this page on tokio.rs “hasn’t been worked on yet”.

Can anyone point me to a good resource for working with Streams? I’m happy to buy a book or pay for a course if it has the information. Or even just a quick code sample would be great.

Or, of course, if I’m over-complicating things and I should just concat2() the request then please tell me that :slight_smile:

You can try wrapping the stream with a custom stream that buffers the underlying chunks and carves out line delimited fragments out of it.

It looks like actix has https://actix.rs/actix-web/actix_web/dev/struct.PayloadBuffer.html#method.readline. That method’s return type makes it amenable to wrapping via https://docs.rs/futures/0.1.25/futures/stream/fn.poll_fn.html. So you can try to combine these two things to give you a Stream of Bytes, where each Bytes instance is a line; you can then attach continuations/combinators onto this stream that parse/process the individual lines.

Thank you! Took me a while (still learning) but I’ve ended up with this:

fn intercept(req: &HttpRequest) -> BoxFut {
    let mut payload_buffer = PayloadBuffer::new(req.payload() );
    poll_fn(move || -> Poll<Option<Bytes>, PayloadError> { payload_buffer.readline() })
        .from_err()
        .fold(
            0,
            move |counter, buf| {
                // do the processing...
            }
        ).and_then(|_| Ok(HttpResponse::Ok().finish()))
        .responder()
}