I would like to collect a stream into a complete concatenated String typed message of a limited size in non-blocking way. I have found that concat2 can do this:
fn get_body(&self, body: hyper::Body) -> Box<Future<Item=String, Error=<Self as hyper::client::Service>::Error>> {
let full_body = body.concat2()
.map(|chunk| {
let v = chunk.to_vec();
String::from_utf8_lossy(&v).to_string()
});
Box::new(full_body)
}
It is good so far. Now, I am trying to figure out how to make sure that the returned Future is resolved either:
into a string of a size, which is not longer than X bytes or chars,
OR
into an error, which indicates that the payload is too big, and the whole body is consumed and discarded.
The idea is to implement some sort of a prevention of an attack for resources by a client, i.e. that the RAM is not overflow by large HTTP request.
How could I achieve it with hyper::Body / Stream<Chunk> API? What additional API / libraries should I use for this? Is there an example of doing this?
You can put a combinator in front of concat2, like map, that will do byte accounting; if byte count goes over the limit you return an error, else it returns the chunk. Alternatively, you can mimic concat2 with a fold that essentially does the same: checks byte count and if ok, appends to a String, otherwise it returns an error.
@vitalyd , thank you! I have attempted with the map combinator approach in front of concat2 (I used and_then instead of map to flatten Result<Chunk> back to Chunk for concat2). I struggle to satisfy lifetime requirements (I am not even sure lifetime of what )... Could you please advise what is wrong in my approach with the mapping function?
fn get_body(&self, body: hyper::Body) -> Box<Future<Item=String, Error=<Self as hyper::client::Service>::Error>> {
let mut sum_size = 0;
let chain = body.and_then(move |c| {
{
let c_ref = c.as_ref();
let bounds = c_ref.bytes().size_hint();
sum_size += bounds.1.unwrap_or(self.max_body_size + 1);
}
let result = if sum_size > self.max_body_size {
Err(hyper::error::Error::TooLarge)
} else {
Ok(c)
};
result
});
let full_body = chain.concat2()
.map(|chunk| {
let v = chunk.to_vec();
String::from_utf8_lossy(&v).to_string()
});
Box::new(full_body)
}
Here is the lifetime error:
error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
--> src\server.rs:58:9
|
58 | Box::new(full_body)
| ^^^^^^^^^^^^^^^^^^^
|
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 36:5...
--> src\server.rs:36:5
|
36 | / fn get_body(&self, body: hyper::Body) -> Box<Future<Item=String, Error=<Self as hyper::client::Service>::Error>> {
37 | | let mut sum_size = 0;
38 | | let chain = body.and_then(move |c| {
39 | | {
... |
58 | | Box::new(full_body)
59 | | }
| |_____^
note: ...so that the type `futures::Map<futures::stream::Concat2<futures::stream::AndThen<hyper::Body, [closure@src\server.rs:38:35: 50:10 sum_size:usize, self:&server::AgentService], std::result::Result<hyper::Chunk, hyper::Error>>>, [closure@src\server.rs:54:18: 57:14]>` will meet its required lifetime bounds
--> src\server.rs:58:9
|
58 | Box::new(full_body)
| ^^^^^^^^^^^^^^^^^^^
= note: but, the lifetime must be valid for the static lifetime...
note: ...so that expression is assignable (expected std::boxed::Box<futures::Future<Item=std::string::String, Error=hyper::Error> + 'static>, found std::boxed::Box<futures::Future<Item=std::string::String, Error=hyper::Error>>)
--> src\server.rs:58:9
|
58 | Box::new(full_body)
| ^^^^^^^^^^^^^^^^^^^
Looks like the first closure is capturing a reference to AgentService, which is self. Try to put self.max_body_size into a local binding before the closure and then use it in the closure instead of self.max_body_size.
FYI, the hint was in this part of the error message:
Note how it tells you the fields of the closure there, including the self:&server::AgentService - that’s the key part showing a reference being captured.
@vitalyd, as a follow up question: what is the difference between c.deref() and using *c operator? Is it syntax sugar or there is something more behind?