I have got a webserver written with hyper. I attempt to mitigate the overflow which can be caused by large request body. I interrupt the stream by breaking the consumption, when I detect it is too large. This logic causes the process to terminate silently, if a client submits a request larger than max_size. Is that expected behavior for the hyper server to cause termination of the process?
impl Server {
fn get_body(&self, body: hyper::Body) -> Box<Future<Item=String, Error=errors::Error>> {
let mut sum_size = 0;
let max_size = 1024 * 1024;
let chain = body.and_then(move |c| {
sum_size += c.deref().len();
let result = if sum_size > max_size {
futures::future::err(hyper::error::Error::TooLarge)
} else {
futures::future::ok(c)
};
result
});
let full_body = chain.concat2()
.map(|chunk| {
let v = chunk.to_vec();
String::from_utf8_lossy(&v).to_string()
});
Box::new(full_body.then(|r| Ok(r.chain_err(|| "fetching request body")?)))
}
}
impl hyper::server::Service for Server {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;
fn call(&self, _req: Request) -> Box<Future<Item=Self::Response, Error=Self::Error>> {
let resp = self.get_body(_req.body()).map(|_| {hyper::StatusCode::NotFound})
let resp = resp.then(move |r: errors::Result<hyper::StatusCode>| {
match r {
Ok(s) => {
Ok(Response::new().with_status(s))
}
Err(e) => {
Ok(Response::new().with_status(hyper::StatusCode::BadRequest))
}
}
});
Box::new(resp)
}
}
In fact, the process silently exits with code 0 on any first request, if the server has got the following implementation (not consuming body of a request at all). I think it is a bug, but maybe I am doing something wrong. Could you please suggest what I should do to fix it?
impl hyper::server::Service for Server {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;
fn call(&self, _req: Request) -> Box<Future<Item=Self::Response, Error=Self::Error>> {
let resp = Box::new(futures::future::ok(hyper::StatusCode::NotFound))
let resp = resp.then(move |r: errors::Result<hyper::StatusCode>| {
match r {
Ok(s) => {
Ok(Response::new().with_status(s))
}
Err(e) => {
Ok(Response::new().with_status(hyper::StatusCode::BadRequest))
}
}
});
Box::new(resp)
}
}
As a start, enable trace level logging and see what hyper (and maybe tokio) are spitting out. Paste it here.
It almost sounds like an error is causing your entire event loop (reactor) to exit, as-if the request handling is the root future (it shouldn’t be - hyper spawns connection handling as a background task).
PS It is strange: I can get only logs for hyper with env_logger or only tokio logs with stderrlog but not together. It is separate story... I am investigating. but the hyper logs on the terminated process are above.
So that all looks fine to me although I’m not a hyper expert. Body wasn’t consumed, as intended, and hyper drops the connection - seems pretty innocent.
So back to how you’re running hyper - can you show that code?
Also, you say the server exits with 0. Do you have logging for when the server run call exits? Do you handle/print errors there? As I mentioned upthread, it almost sounds like handling this connection was the root future on the event loop, rather than a background one.
let listener: TcpListener = TcpListener::bind(&address, &handle).chain_err(|| format!("binding to {}", address))?;
let server = listener.incoming()
.into_future()
.map_err(|_| unreachable!());
let server = server
.and_then(|(item, _incoming)| {
let (stream, _) = item.unwrap();
let service = Server::new();
Http::<hyper::Chunk>::new().serve_connection(stream, service)
// ignore Opaque from hyper::server, will likely get changed in the future
// see more here: https://users.rust-lang.org/t/what-is-hyper-server-unnameable-opaque/15329
.map(|_| {()})
});
let result = core.run(server).chain_err(|| "launching the server");
debug!("Server terminated with result: {:?}", result);
return result
I have added debug traces after core.run(). It is executed immediately after a request and prints:
Server terminated with result: Ok(())
The result is handled by the caller: it is printed using error-chain error printing pattern. Since the result is Ok(), nothing is printed by the application and it exits, but the added trace line now prints as shown above.
let server = listener.incoming()
.into_future()
.map_err(|_| unreachable!());
let server = server
.and_then(|(item, _incoming)| {
let (stream, _) = item.unwrap();
let service = Server::new();
Http::<hyper::Chunk>::new().serve_connection(stream, service)
// ignore Opaque from hyper::server, will likely get changed in the future
// see more here: https://users.rust-lang.org/t/what-is-hyper-server-unnameable-opaque/15329
.map(|_| {()})
});
let result = core.run(server).chain_err(|| "launching the server");
This doesn't look right. You're turning the Incoming stream into a future; this future resolves with the first connection and the rest of the stream (_incoming in your code). You then run the client connection handling on the first accepted stream. Once that resolves, the future completes, and your server exits - there's nothing keeping the rest of the Incoming alive. So your server would exit as soon as the first client closes the connection, even if you consume all the body data.
This is why I was saying it looks like precisely what's happening here - the client connection is essentially the root future.
The code should instead look something like:
let listener: TcpListener = TcpListener::bind(&address, &handle).chain_err(|| format!("binding to {}", address))?;
let server = listener.incoming().for_each(|(stream, _addr)| {
let service = Server::new();
Http::<hyper::Chunk>::new().serve_connection(stream, service)
// ignore Opaque from hyper::server, will likely get changed in the future
// see more here: https://users.rust-lang.org/t/what-is-hyper-server-unnameable-opaque/15329
.map(|_| {()})
});
let result = core.run(server).chain_err(|| "launching the server");
Thank you very much. It helped. I thought that into_future is continuously emitting future, but not one off job.
I needed to map an error to fix the compilation:
Http::<hyper::Chunk>::new().serve_connection(stream, service)
// ignore Opaque from hyper::server, will likely get changed in the future
// see more here: https://users.rust-lang.org/t/what-is-hyper-server-unnameable-opaque/15329
.map(|r| { () })
.map_err(|e| { io::Error::new(io::ErrorKind::Other, e) });
I am not sure if it is correct error handling approach.. or I should add traces in the conversion function or handle it somehow differently?
Actually, I think you want the following instead (haven't tried compiling, just the idea):
let server = listener.incoming().for_each(|(stream, _addr)| {
let service = Server::new();
let serve_con = Http::<hyper::Chunk>::new().serve_connection(stream, service)
// ignore Opaque from hyper::server, will likely get changed in the future
// see more here: https://users.rust-lang.org/t/what-is-hyper-server-unnameable-opaque/15329
.map(|_| {()})
.map_err(|e| {
// handle it somehow - probably just a log and/or whatever alerting mechanism
()
});
// Handle client in the background
handle.spawn(serve_con);
// Keep accepting new connections immediately
Ok(())
});
let result = core.run(server).chain_err(|| "launching the server");
for_each runs the future returned from each iteration to completion before taking the next element in the stream - we don't actually want that. We want the different clients to be serviced in the background, which is what spawn()'ing them will achieve. The error handling will be fully encapsulated by the closure given to spawn() because the future that it takes must have no errors (i.e. Error = ()).
You'll probably want to handle the error by simply logging it - I don't see the need for anything beyond that.