How to handle HTTP cancellation and cleanup in hyper

Hi,

I’m building a web server based on hyper, and some of my API endpoints need to run a long loop. Sometimes the client may cancel the request, and in that case I’d like to perform some cleanup after the request is fully dropped.

The problem is: I’m not sure how hyper handles HTTP request cancellation, or how I can hook into it from my code.

Here’s a simplified version of my setup:

// api.rs
pub trait Action: Send + Sync {
    fn execute(&self, req: Req) -> impl Future<Output = ServerResult<Response<BoxBody>>> + Send;

    /// default requires authentication
    fn needs_auth(&self) -> bool {
        true
    }

    /// default not cancellable
    // TODO: handle request cancellation
    fn cancellable(&self) -> bool {
        false
    }

    fn auth(&self, req: &Req) -> ServerResult<()> {
        if let Some(auth_header) = req.headers().get(header::AUTHORIZATION)
            && !auth_header.is_empty()
            && auth::authorize(auth_header.to_str().unwrap_or_default())
        {
            return Ok(());
        }
        Err(ServerError { kind: ServerErrorKind::Unauthorized })
    }
}

And the server side:

// server.rs
type BoxBody = http_body_util::combinators::BoxBody<Bytes, Infallible>;
type ServerResult<T> = std::result::Result<T, Infallible>;
type Req = Request<Incoming>;

macro_rules! define_routes {
    ($($path: literal => $action_type: ty),* $(,)?) => {
        async fn route(req: Req, socket_addr: SocketAddr) -> ServerResult<Response<BoxBody>> {
            if Config::read(|c| c.is_only_inner_ip && !is_inner_ip(socket_addr)).await {
                return Ok(ResultResponse::error_with_code(StatusCode::FORBIDDEN));
            }
            match req.uri().path() {
                $(
                    $path => {
                        let action = <$action_type>::default();
                        if action.needs_auth() {
                            if let Err(e) = action.auth(&req) {
                                return handle(e);
                            }
                        }
                        match action.execute(req).await {
                            Ok(res) => Ok(res),
                            Err(e) => handle(e),
                        }
                    }
                )*
                _ => match api::asset_api::AssetAPI.execute(req).await {
                    Ok(res) => Ok(res),
                    Err(e) => handle(e),
                }
            }
        }
    };
}

define_routes! {
    "/path/to/endpoint" => api::OneActionType,
    // other apis ...
}

And the server run loop:

pub async fn run(
    mut shutdown_rx: broadcast::Receiver<()>,
    port: u16,
) -> std::result::Result<(), Error> {
    let addr: SocketAddr = ([127, 0, 0, 1], port).into();
    let listener = TcpListener::bind(addr).await.map_err(CommonError::from)?;
    let graceful = hyper_util::server::graceful::GracefulShutdown::new();
    info!("Listening on http://{addr}");

    loop {
        select! {
            accept_result = listener.accept().fuse() => {
                match accept_result {
                    Ok((stream, socket_addr)) => {
                        let io = TokioIo::new(stream);
                        let conn = http1::Builder::new().serve_connection(io, service_fn(move |req| {
                            route(req, socket_addr)
                        }));
                        let fut = graceful.watch(conn);
                        tokio::spawn(async move {
                            if let Err(e) = fut.await {
                                error!("Error serving connection: {e}");
                            }
                        });
                    }
                    Err(e) => {
                        error!("Failed to accept connection: {e:?}");
                        continue;
                    }
                }
            }

            _ = shutdown_rx.recv().fuse() => {
                drop(listener);
                info!("Server received shutdown signal");
                break;
            }
        }
    }

    graceful.shutdown().await;
    Ok(())
}

Suppose I have one API action like this:

pub struct OneAPI;

impl Action for OneAPI {
    async fn execute(&self, req: Req) -> ServerResult<Response<BoxBody>> {
        loop {
            if meets_some_condition {
                break;
            }
        }
        Ok(ResultResponse::success())
    }
}

Now, if the client cancels the request while this loop is running, I want to:

  1. Detect that the request was cancelled.
  2. Break out of the loop.
  3. Run some cleanup logic before returning.

What’s the recommended way to do this in hyper, ideally with minimal changes to my current structure?

Thanks!

A quick search finds Error::is_canceled

Presumably (API docs seem a bit thin) this is returned from the connection future (not much elsewhere it could come from), so your loop was effectively killed wherever it was at (which had to be an await point).

I don't think you're going to be able to better control when the service is dropped via hyper directly, if you only want to stop at the loop head, you're going to be to spawn inside that loop again and detect when the parent task is dropped. A simple option there is to keep a channel sender open in the parent and check in the loop head with try_recv that you get a "no message" error rather than a "sender dropped" error - but I might be missing a better option?


All of this is assuming you can't do the cleanup in sync code, in which case you can just implement Drop on a type in your service code!

Thanks for your enlightening reply! But my loop is strongly depended among the context, so spawn the loop may not be a good choise. My solution is introducing a new struct, which implements Drop, and call spawn inside drop fn:

struct CancelGuard {
   hash: String,
    // ... whole data I needed when cancelling
   finished: bool,
}

impl Drop for CancelGuard {
    fn drop(&mut self) {
        if !finished {
            tokio::spawn(async move {
                // handle cancelling
            })
        }
    }
}

And inside my execute fn:

let mut guard = CancelGuard {
    finished: false,
    // ...
}
loop {
    // do something
}
// Once loop finished, mark guard is finished, so when guard is dropped the cancel fn woundn't call
guard.finished = true;

And it seems to work, but I'm still awaiting better options!

Ah, I realize I made a mistake. My previous test looked like it worked, but in fact the code above is wrong. The Drop implementation of CancelGuard will only be called when the loop finishes and the guard goes out of scope — it just follows the normal Rust rules.

So I think the key to solve my problem is to find the behavior of hyper when an HTTP request is cancelled.

That is strange, futures do drop all locals in scope when they are dropped. Does your outer spawn get the error? Surely the service future has dropped at that point...

The point is that, my service future won't drop even when connection has closed! I suppose there's something wrong with my whole server structure.

pub async fn run(
    mut shutdown_rx: broadcast::Receiver<()>,
    port: u16,
) -> std::result::Result<(), Error> {
    let addr: SocketAddr = ([127, 0, 0, 1], port).into();
    let listener = TcpListener::bind(addr).await.map_err(CommonError::from)?;
    // disable graceful shutdown for test purpose
    // let graceful = hyper_util::server::graceful::GracefulShutdown::new();
    info!("Listening on http://{addr}");

    loop {
        select! {
            accept_result = listener.accept().fuse() => {
                match accept_result {
                    Ok((stream, socket_addr)) => {
                        debug!("Accepted connection from {socket_addr}");

                        let io = TokioIo::new(stream);
                        let conn =  http1::Builder::new().serve_connection(io, service_fn(move |req| {
                            route(req, socket_addr)
                        }));
                        
                        tokio::spawn(async move {
                            if let Err(e) = conn.await {
                                error!("Error serving connection: {e}");
                            } else {
                                debug!("Connection from {socket_addr} closed");
                            }
                        });
                    }

                    Err(e) => {
                        error!("Failed to accept connection: {e:?}");
                        continue;
                    }
                }
            }

// ... handle server shutdown

    Ok(())
}

And my route function looks like:

async fn route(req: Req, socket_addr: SocketAddr) -> Result<Response<BoxBody>> {
    match req.uri().path() {
        "some/path" => {
            let action = API::default();
            match action.execute(req).await { Ok(..) => ..., Err(..)=> ...}
        }
    }
}

struct TestGuard;
impl Drop for TestGuard {
    fn drop(&mut self) {
        debug!("dropped");
    }
}

struct API;
impl Action for API {
    async fn action(req: Req) -> Result<Response<BoxBody>> {
        let _guard = TestGuard {};
        loop {

            // As I was tested, even the connection has closed the loop won't break
            if let Some(state) = get_state().await {break;}
            sleep(Duration::from_secs(1)).await;
        }
        Ok(...)
    }
}

Ah, reading the actual source, I now believe that it's actually the error is_incomplete_message, which is http1 only - http2 has explicit stream close reasons so you'd need to do that differently.

That looks like it explains why you're not getting your service killed either: you need to actually read from the body, even if you expect no content (normally in this sort of situation you use chunked encoding, and send empty chunks as a keep-alive of sorts) - and that should make it a lot easier to break out if your loop!


Edit: just remembered that closing after chunk isn't an incomplete message either: you would just get the end of the stream. That still works though.

Yes! That's it, thanks!

Glad to muddle through it finally! For being the primary http library it's surprisingly under-documented.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.