How to interrupt Stream::concat2 by maximum chunk size?

I would like to collect a stream into a complete concatenated String typed message of a limited size in non-blocking way. I have found that concat2 can do this:

    fn get_body(&self, body: hyper::Body) -> Box<Future<Item=String, Error=<Self as hyper::client::Service>::Error>> {
        let full_body = body.concat2()
            .map(|chunk| {
                let v = chunk.to_vec();
                String::from_utf8_lossy(&v).to_string()
            });
        Box::new(full_body)
    }

It is good so far. Now, I am trying to figure out how to make sure that the returned Future is resolved either:

  • into a string of a size, which is not longer than X bytes or chars,
    OR
  • into an error, which indicates that the payload is too big, and the whole body is consumed and discarded.

The idea is to implement some sort of a prevention of an attack for resources by a client, i.e. that the RAM is not overflow by large HTTP request.

How could I achieve it with hyper::Body / Stream<Chunk> API? What additional API / libraries should I use for this? Is there an example of doing this?

You can put a combinator in front of concat2, like map, that will do byte accounting; if byte count goes over the limit you return an error, else it returns the chunk. Alternatively, you can mimic concat2 with a fold that essentially does the same: checks byte count and if ok, appends to a String, otherwise it returns an error.

1 Like

@vitalyd , thank you! I have attempted with the map combinator approach in front of concat2 (I used and_then instead of map to flatten Result<Chunk> back to Chunk for concat2). I struggle to satisfy lifetime requirements (I am not even sure lifetime of what :open_mouth: )... Could you please advise what is wrong in my approach with the mapping function?

    fn get_body(&self, body: hyper::Body) -> Box<Future<Item=String, Error=<Self as hyper::client::Service>::Error>> {
        let mut sum_size = 0;
        let chain = body.and_then(move |c| {
            {
                let c_ref = c.as_ref();
                let bounds = c_ref.bytes().size_hint();
                sum_size += bounds.1.unwrap_or(self.max_body_size + 1);
            }
            let result = if sum_size > self.max_body_size {
                Err(hyper::error::Error::TooLarge)
            } else {
                Ok(c)
            };
            result
        });

        let full_body = chain.concat2()
            .map(|chunk| {
                let v = chunk.to_vec();
                String::from_utf8_lossy(&v).to_string()
            });
        Box::new(full_body)
    }

Here is the lifetime error:

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> src\server.rs:58:9
   |
58 |         Box::new(full_body)
   |         ^^^^^^^^^^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 36:5...
  --> src\server.rs:36:5
   |
36 | /     fn get_body(&self, body: hyper::Body) -> Box<Future<Item=String, Error=<Self as hyper::client::Service>::Error>> {
37 | |         let mut sum_size = 0;
38 | |         let chain = body.and_then(move |c| {
39 | |             {
...  |
58 | |         Box::new(full_body)
59 | |     }
   | |_____^
note: ...so that the type `futures::Map<futures::stream::Concat2<futures::stream::AndThen<hyper::Body, [closure@src\server.rs:38:35: 50:10 sum_size:usize, self:&server::AgentService], std::result::Result<hyper::Chunk, hyper::Error>>>, [closure@src\server.rs:54:18: 57:14]>` will meet its required lifetime bounds
  --> src\server.rs:58:9
   |
58 |         Box::new(full_body)
   |         ^^^^^^^^^^^^^^^^^^^
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that expression is assignable (expected std::boxed::Box<futures::Future<Item=std::string::String, Error=hyper::Error> + 'static>, found std::boxed::Box<futures::Future<Item=std::string::String, Error=hyper::Error>>)
  --> src\server.rs:58:9
   |
58 |         Box::new(full_body)
   |         ^^^^^^^^^^^^^^^^^^^

Looks like the first closure is capturing a reference to AgentService, which is self. Try to put self.max_body_size into a local binding before the closure and then use it in the closure instead of self.max_body_size.

1 Like

Ohh. Great. It fixed it. I could not interpret it from the compiler error!

FYI, the hint was in this part of the error message:

Note how it tells you the fields of the closure there, including the self:&server::AgentService - that’s the key part showing a reference being captured.

1 Like

Thank you! I can see it now. I will try to use it next time.

Separately,

let c_ref = c.as_ref();
 let bounds = c_ref.bytes().size_hint();
 sum_size += bounds.1.unwrap_or(self.max_body_size + 1);

Chunk derefs to [u8] so you can just do c.deref().len() to get the number of bytes in it.

1 Like

Great stuff! I knew this should exist, but could not find one.

@vitalyd, as a follow up question: what is the difference between c.deref() and using *c operator? Is it syntax sugar or there is something more behind?

It’s the operator (ie syntax sugar) for calling std::ops::Deref::deref()

Thank you