Hyper server silently crashes when request body is not fully consumed. Is that right behavior? how to fix?


#1

I have got a webserver written with hyper. I attempt to mitigate the overflow which can be caused by large request body. I interrupt the stream by breaking the consumption, when I detect it is too large. This logic causes the process to terminate silently, if a client submits a request larger than max_size. Is that expected behavior for the hyper server to cause termination of the process?

impl Server {
    fn get_body(&self, body: hyper::Body) -> Box<Future<Item=String, Error=errors::Error>> {
        let mut sum_size = 0;
        let max_size = 1024 * 1024;
        let chain = body.and_then(move |c| {
            sum_size += c.deref().len();
            let result = if sum_size > max_size {
                futures::future::err(hyper::error::Error::TooLarge)
            } else {
                futures::future::ok(c)
            };
            result
        });

        let full_body = chain.concat2()
            .map(|chunk| {
                let v = chunk.to_vec();
                String::from_utf8_lossy(&v).to_string()
            });
        Box::new(full_body.then(|r| Ok(r.chain_err(|| "fetching request body")?)))
    }
}


impl hyper::server::Service for Server {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;

    fn call(&self, _req: Request) -> Box<Future<Item=Self::Response, Error=Self::Error>> {
        let resp = self.get_body(_req.body()).map(|_| {hyper::StatusCode::NotFound})
        let resp = resp.then(move |r: errors::Result<hyper::StatusCode>| {
            match r {
                Ok(s) => {
                    Ok(Response::new().with_status(s))
                }
                Err(e) => {
                    Ok(Response::new().with_status(hyper::StatusCode::BadRequest))
                }
            }
        });
        Box::new(resp)
    }
}

In fact, the process silently exits with code 0 on any first request, if the server has got the following implementation (not consuming body of a request at all). I think it is a bug, but maybe I am doing something wrong. Could you please suggest what I should do to fix it?

impl hyper::server::Service for Server {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;

    fn call(&self, _req: Request) -> Box<Future<Item=Self::Response, Error=Self::Error>> {
        let resp = Box::new(futures::future::ok(hyper::StatusCode::NotFound))
        let resp = resp.then(move |r: errors::Result<hyper::StatusCode>| {
            match r {
                Ok(s) => {
                    Ok(Response::new().with_status(s))
                }
                Err(e) => {
                    Ok(Response::new().with_status(hyper::StatusCode::BadRequest))
                }
            }
        });
        Box::new(resp)
    }
}

#2

As a start, enable trace level logging and see what hyper (and maybe tokio) are spitting out. Paste it here.

It almost sounds like an error is causing your entire event loop (reactor) to exit, as-if the request handling is the root future (it shouldn’t be - hyper spawns connection handling as a background task).


#3

Have not done it before. Does it mean that I have to enable and configure one of the log implementation, listed here: https://github.com/rust-lang-nursery/log?


#4

Yes


#5

Here is the log file for the last simpler example (i.e. server does not consume body at all):

2018-02-28T11:11:26.528237600+13:00 - TRACE - registering with poller
2018-02-28T11:11:26.530236300+13:00 - TRACE - registering with poller
2018-02-28T11:11:26.533238000+13:00 - DEBUG - adding a new I/O source
2018-02-28T11:11:26.534237500+13:00 - TRACE - registering with poller
2018-02-28T11:11:26.534237500+13:00 - TRACE - register Token(2) Readable | Writable
2018-02-28T11:11:26.535236700+13:00 - TRACE - set readiness to (empty)
2018-02-28T11:11:26.535236700+13:00 - TRACE - scheduling an accept
2018-02-28T11:11:26.536235900+13:00 - DEBUG - consuming notification queue
2018-02-28T11:11:26.536235900+13:00 - DEBUG - scheduling direction for: 0
2018-02-28T11:11:26.537237000+13:00 - DEBUG - blocking
2018-02-28T11:11:26.537237000+13:00 - TRACE - select; timeout=Some(Duration { secs: 0, nanos: 0 })
2018-02-28T11:11:26.538235400+13:00 - TRACE - polling IOCP
2018-02-28T11:11:26.538235400+13:00 - TRACE - returning
2018-02-28T11:11:26.540237500+13:00 - DEBUG - loop poll - Duration { secs: 0, nanos: 2553072 }
2018-02-28T11:11:26.540237500+13:00 - DEBUG - loop time - Instant { t: 10440944989569 }
2018-02-28T11:11:26.541236700+13:00 - TRACE - event Readable Token(0)
2018-02-28T11:11:26.542235900+13:00 - DEBUG - consuming notification queue
2018-02-28T11:11:26.542235900+13:00 - DEBUG - loop process - 1 events, Duration { secs: 0, nanos: 2157038 }
2018-02-28T11:11:26.543240200+13:00 - TRACE - select; timeout=Some(Duration { secs: 0, nanos: 0 })
2018-02-28T11:11:26.543240200+13:00 - TRACE - polling IOCP
2018-02-28T11:11:26.544236500+13:00 - TRACE - returning
2018-02-28T11:11:26.544236500+13:00 - DEBUG - loop poll - Duration { secs: 0, nanos: 1525790 }
2018-02-28T11:11:26.545238600+13:00 - DEBUG - loop time - Instant { t: 10440945000948 }
2018-02-28T11:11:26.545238600+13:00 - DEBUG - loop process - 0 events, Duration { secs: 0, nanos: 1235875 }
2018-02-28T11:11:26.546240400+13:00 - TRACE - select; timeout=None
2018-02-28T11:11:26.546240400+13:00 - TRACE - polling IOCP
2018-02-28T11:11:36.083988700+13:00 - TRACE - select; -> got overlapped
2018-02-28T11:11:36.084999200+13:00 - TRACE - finished an accept
2018-02-28T11:11:36.086006100+13:00 - TRACE - set readiness to Readable
2018-02-28T11:11:36.086006100+13:00 - TRACE - returning
2018-02-28T11:11:36.086006100+13:00 - DEBUG - loop poll - Duration { secs: 9, nanos: 540223019 }
2018-02-28T11:11:36.086986700+13:00 - DEBUG - loop time - Instant { t: 10440971167171 }
2018-02-28T11:11:36.086986700+13:00 - TRACE - event Readable Token(2)
2018-02-28T11:11:36.087989200+13:00 - DEBUG - notifying a task handle
2018-02-28T11:11:36.087989200+13:00 - DEBUG - loop process - 1 events, Duration { secs: 0, nanos: 1724901 }
2018-02-28T11:11:36.088988400+13:00 - TRACE - select; timeout=Some(Duration { secs: 0, nanos: 0 })
2018-02-28T11:11:36.088988400+13:00 - TRACE - polling IOCP
2018-02-28T11:11:36.089989000+13:00 - TRACE - returning
2018-02-28T11:11:36.089989000+13:00 - DEBUG - loop poll - Duration { secs: 0, nanos: 1503544 }
2018-02-28T11:11:36.090993000+13:00 - DEBUG - loop time - Instant { t: 10440971177168 }
2018-02-28T11:11:36.090993000+13:00 - TRACE - event Readable Token(1)
2018-02-28T11:11:36.090993000+13:00 - DEBUG - loop process - 1 events, Duration { secs: 0, nanos: 1294222 }
2018-02-28T11:11:36.091986000+13:00 - TRACE - set readiness to (empty)
2018-02-28T11:11:36.091986000+13:00 - TRACE - scheduling an accept
2018-02-28T11:11:36.092985900+13:00 - DEBUG - adding a new I/O source
2018-02-28T11:11:36.093991700+13:00 - TRACE - registering with poller
2018-02-28T11:11:36.093991700+13:00 - TRACE - register Token(4) Readable | Writable
2018-02-28T11:11:36.094989800+13:00 - TRACE - set readiness to (empty)
2018-02-28T11:11:36.095989700+13:00 - TRACE - scheduling a read
2018-02-28T11:11:36.095989700+13:00 - TRACE - set readiness to Readable
2018-02-28T11:11:36.095989700+13:00 - TRACE - set readiness to Readable | Writable
2018-02-28T11:11:36.096987500+13:00 - DEBUG - dropping I/O source: 0
2018-02-28T11:11:36.097991400+13:00 - TRACE - cancelling active TCP accept
2018-02-28T11:11:36.097991400+13:00 - DEBUG - scheduling direction for: 1
2018-02-28T11:11:36.097991400+13:00 - DEBUG - blocking
2018-02-28T11:11:36.098987000+13:00 - TRACE - select; timeout=Some(Duration { secs: 0, nanos: 0 })
2018-02-28T11:11:36.099986900+13:00 - TRACE - polling IOCP
2018-02-28T11:11:36.099986900+13:00 - TRACE - select; -> got overlapped
2018-02-28T11:11:36.100991200+13:00 - TRACE - finished an accept
2018-02-28T11:11:36.102989300+13:00 - TRACE - set readiness to Readable
2018-02-28T11:11:36.103992100+13:00 - TRACE - returning
2018-02-28T11:11:36.104988800+13:00 - DEBUG - loop poll - Duration { secs: 0, nanos: 5669564 }
2018-02-28T11:11:36.104988800+13:00 - DEBUG - loop time - Instant { t: 10440971217327 }
2018-02-28T11:11:36.105987200+13:00 - TRACE - event Readable | Writable Token(4)
2018-02-28T11:11:36.106986800+13:00 - DEBUG - notifying a task handle
2018-02-28T11:11:36.106986800+13:00 - DEBUG - loop process - 1 events, Duration { secs: 0, nanos: 2313482 }
2018-02-28T11:11:36.107989600+13:00 - TRACE - select; timeout=Some(Duration { secs: 0, nanos: 0 })
2018-02-28T11:11:36.107989600+13:00 - TRACE - polling IOCP
2018-02-28T11:11:36.108989200+13:00 - TRACE - returning
2018-02-28T11:11:36.108989200+13:00 - DEBUG - loop poll - Duration { secs: 0, nanos: 1499533 }
2018-02-28T11:11:36.109989500+13:00 - DEBUG - loop time - Instant { t: 10440971229135 }
2018-02-28T11:11:36.109989500+13:00 - TRACE - event Readable Token(1)
2018-02-28T11:11:36.110986500+13:00 - DEBUG - loop process - 1 events, Duration { secs: 0, nanos: 1774132 }
2018-02-28T11:11:36.112987800+13:00 - TRACE - set readiness to Readable
2018-02-28T11:11:36.112987800+13:00 - TRACE - scheduling a write of 82 bytes
2018-02-28T11:11:36.119990700+13:00 - TRACE - done immediately with 82 bytes
2018-02-28T11:11:36.122990800+13:00 - TRACE - set readiness to Readable | Writable

#6

Hmm, nothing at all from hyper itself? Strange. How did you configure the logging?

How are you launching the hyper server? IIRC, you were starting the reactor yourself and then passing binding accepted connections yourself.

@seanmonstar may have a good idea about what you’re seeing.


#7

I am trying another logger backend. I understood that stderrlog is very limited and buggy (eg. it does not log message from my module).


#8

So were you able to get any hyper logs?

BTW, looks like hyper supports this out of the box: https://docs.rs/hyper/0.11.18/hyper/server/struct.Http.html#method.max_buf_size.


#9

Sorry, was distracted at work by other projects. Here is the log for hyper.

TRACE 2018-02-28T03:25:12Z: hyper::proto::dispatch: Dispatcher::poll
TRACE 2018-02-28T03:25:12Z: hyper::proto::conn: Conn::read_head
TRACE 2018-02-28T03:25:12Z: hyper::proto::conn: Conn::write_queued()
TRACE 2018-02-28T03:25:12Z: hyper::proto::conn: flushed State { reading: Init, writing: Init, keep_alive: Busy, read_task: None }
TRACE 2018-02-28T03:25:12Z: hyper::proto::dispatch: Dispatcher::poll
TRACE 2018-02-28T03:25:12Z: hyper::proto::conn: Conn::read_head
DEBUG 2018-02-28T03:25:12Z: hyper::proto::io: read 470 bytes
TRACE 2018-02-28T03:25:12Z: hyper::proto::h1::parse: Request.parse([Header; 100], [u8; 470])
TRACE 2018-02-28T03:25:12Z: hyper::proto::h1::parse: Request.parse Complete(151)
DEBUG 2018-02-28T03:25:12Z: hyper::proto::io: parsed 5 headers (151 bytes)
DEBUG 2018-02-28T03:25:12Z: hyper::proto::conn: incoming body is content-length (319 bytes)
TRACE 2018-02-28T03:25:12Z: hyper::proto: expecting_continue(version=Http11, header=None) = false
TRACE 2018-02-28T03:25:12Z: hyper::proto: should_keep_alive(version=Http11, header=None) = true
TRACE 2018-02-28T03:25:12Z: hyper::proto::dispatch: body receiver dropped before eof, closing
TRACE 2018-02-28T03:25:12Z: hyper::proto::conn: State::close_read()
TRACE 2018-02-28T03:25:12Z: hyper::proto::io: WriteBuf reserving initial 8192
TRACE 2018-02-28T03:25:12Z: hyper::proto::h1::parse: ServerTransaction::encode has_body=false, method=Some(Post)
TRACE 2018-02-28T03:25:12Z: hyper::proto::conn: Conn::write_queued()
DEBUG 2018-02-28T03:25:12Z: hyper::proto::io: flushed 82 bytes
TRACE 2018-02-28T03:25:12Z: hyper::proto::conn: State::close()
TRACE 2018-02-28T03:25:12Z: hyper::proto::conn: maybe_notify; no task to notify
TRACE 2018-02-28T03:25:12Z: hyper::proto::conn: flushed State { reading: Closed, writing: Closed, keep_alive: Disabled, read_task: None }
TRACE 2018-02-28T03:25:12Z: hyper::proto::conn: shut down IO
TRACE 2018-02-28T03:25:12Z: hyper::proto::dispatch: Dispatch::poll done

PS It is strange: I can get only logs for hyper with env_logger or only tokio logs with stderrlog :slight_smile: but not together. It is separate story… I am investigating. but the hyper logs on the terminated process are above.


#10

So that all looks fine to me although I’m not a hyper expert. Body wasn’t consumed, as intended, and hyper drops the connection - seems pretty innocent.

So back to how you’re running hyper - can you show that code?

Also, you say the server exits with 0. Do you have logging for when the server run call exits? Do you handle/print errors there? As I mentioned upthread, it almost sounds like handling this connection was the root future on the event loop, rather than a background one.


#11

The server runs like the following:

    let listener: TcpListener = TcpListener::bind(&address, &handle).chain_err(|| format!("binding to {}", address))?;
    let server = listener.incoming()
        .into_future()
        .map_err(|_| unreachable!());
    let server = server
        .and_then(|(item, _incoming)| {
            let (stream, _) = item.unwrap();
            let service = Server::new();
            Http::<hyper::Chunk>::new().serve_connection(stream, service)
                // ignore Opaque from hyper::server, will likely get changed in the future
                // see more here: https://users.rust-lang.org/t/what-is-hyper-server-unnameable-opaque/15329
                .map(|_| {()})
        });
    let result = core.run(server).chain_err(|| "launching the server");
    debug!("Server terminated with result: {:?}", result);
    return result

I have added debug traces after core.run(). It is executed immediately after a request and prints:

Server terminated with result: Ok(())

The result is handled by the caller: it is printed using error-chain error printing pattern. Since the result is Ok(), nothing is printed by the application and it exits, but the added trace line now prints as shown above.


#12
 let server = listener.incoming()
        .into_future()
        .map_err(|_| unreachable!());
    let server = server
        .and_then(|(item, _incoming)| {
            let (stream, _) = item.unwrap();
            let service = Server::new();
            Http::<hyper::Chunk>::new().serve_connection(stream, service)
                // ignore Opaque from hyper::server, will likely get changed in the future
                // see more here: https://users.rust-lang.org/t/what-is-hyper-server-unnameable-opaque/15329
                .map(|_| {()})
        });
    let result = core.run(server).chain_err(|| "launching the server");

This doesn’t look right. You’re turning the Incoming stream into a future; this future resolves with the first connection and the rest of the stream (_incoming in your code). You then run the client connection handling on the first accepted stream. Once that resolves, the future completes, and your server exits - there’s nothing keeping the rest of the Incoming alive. So your server would exit as soon as the first client closes the connection, even if you consume all the body data.

This is why I was saying it looks like precisely what’s happening here - the client connection is essentially the root future.

The code should instead look something like:

let listener: TcpListener = TcpListener::bind(&address, &handle).chain_err(|| format!("binding to {}", address))?;    
let server = listener.incoming().for_each(|(stream, _addr)| {
       let service = Server::new();
       Http::<hyper::Chunk>::new().serve_connection(stream, service)
       // ignore Opaque from hyper::server, will likely get changed in the future
       // see more here: https://users.rust-lang.org/t/what-is-hyper-server-unnameable-opaque/15329
         .map(|_| {()})
        });
let result = core.run(server).chain_err(|| "launching the server");

#13

Thank you very much. It helped. I thought that into_future is continuously emitting future, but not one off job.

I needed to map an error to fix the compilation:

Http::<hyper::Chunk>::new().serve_connection(stream, service)
                // ignore Opaque from hyper::server, will likely get changed in the future
                // see more here: https://users.rust-lang.org/t/what-is-hyper-server-unnameable-opaque/15329
                .map(|r| { () })
                .map_err(|e| { io::Error::new(io::ErrorKind::Other, e) });

I am not sure if it is correct error handling approach… or I should add traces in the conversion function or handle it somehow differently?


#14

Actually, I think you want the following instead (haven’t tried compiling, just the idea):


let server = listener.incoming().for_each(|(stream, _addr)| {
       let service = Server::new();
       let serve_con = Http::<hyper::Chunk>::new().serve_connection(stream, service)
       // ignore Opaque from hyper::server, will likely get changed in the future
       // see more here: https://users.rust-lang.org/t/what-is-hyper-server-unnameable-opaque/15329
         .map(|_| {()})
         .map_err(|e| {
                  // handle it somehow - probably just a log and/or whatever alerting mechanism
                  ()
          });
         // Handle client in the background
         handle.spawn(serve_con);
         // Keep accepting new connections immediately
         Ok(())
});
let result = core.run(server).chain_err(|| "launching the server");

for_each runs the future returned from each iteration to completion before taking the next element in the stream - we don’t actually want that. We want the different clients to be serviced in the background, which is what spawn()'ing them will achieve. The error handling will be fully encapsulated by the closure given to spawn() because the future that it takes must have no errors (i.e. Error = ()).

You’ll probably want to handle the error by simply logging it - I don’t see the need for anything beyond that.


#15

Thank you! You are great man!