Async: Abort Future in a hyper http server

In a hyper http server, I need to return a "timeout" string to the client if the CPU-intensive response takes more than 2 seconds, and most importantly, abort the computation.


pub(crate) use {
  futures::{
    future::{Either, Future, Abortable, AbortHandle, Aborted},
    pin_mut,
  },
  hyper::{service::Service, Body, Request, Response, Server},
  std::{
    pin::Pin,
    task::{Context, Poll},
  },
  structopt::StructOpt,
};

struct Svc {
  addr: SocketAddr,
}

impl Service<Request<Body>> for Svc {
  type Response = Response<Body>;
  type Error = hyper::Error;
  type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

  fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
    Poll::Ready(Ok(()))
  }

  fn call(&mut self, req: Request<Body>) -> Self::Future {
    fn mk_response(s: String) -> Result<Response<Body>, hyper::Error> {
      Ok(Response::builder().body(Body::from(s)).unwrap())
    }

    async fn graph_req(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
      let res = tokio::task::spawn(async move {
        let mut cnt = 0;
        for i in 0..1_000_000_000 {  //CPU intensive computation
          cnt += 1;
        }
        cnt
      })
      .await;
      println!("{:?}", res);
      mk_response(res.into())
    }

    match req.uri().path() {
      _ => {
        let short = tokio::time::delay_for(time::Duration::from_secs(2)); //this is my 2 sec time out
        let long = graph_req(req); //this is my long computation
        let (abort_handle, abort_registration) = AbortHandle::new_pair();
        let long_abortable = Abortable::new(long, abort_registration);
        let fut = async move {
          pin_mut!(short);
          pin_mut!(long_abortable);
          match futures::future::select(long_abortable, short).await {
            Either::Left((_, _)) => mk_response("work".to_string()),
            Either::Right((_, _)) => {
              abort_handle.abort(); //this does not work
              mk_response("timeout".to_string()) //this correctly gets returned by select when the computation takes >2s
            }
          }
        };
        Box::pin(fut)
      }
    }
  }
}

The timeout works but the computation continues when I call .abort() above.

AbortHandle can only abort at a yield point. You could insert a call to tokio::task::yield_now periodically into your computation to allow it to work. (Though you still shouldn't really be running CPU intensive work on the Tokio core threadpool, it'd be better to use something other than AbortHandle for cancellation and move the computation to a spawn_blocking call instead).

1 Like

@Nemo157 spawn_blocking is what I initially wrote. But how would I abort the spawn_blocking computation? The docs say it cannot be aborted?

You can write your own code that makes it exit when signalled to exit. The thing in the docs is saying that there is no generic way to do so, but of course you can periodically check a boolean and return if it says you should exit.

@alice my computation is a (CPU-intensive) db transaction, that I can cancel by calling a function on it. But I cannot check a variable during the transaction periodically - it's an atomic operation. I need to call the cancelation function when the timer expires.

If the operation has no async yields points, then there is no automated way to cancel it, and you must implement your own cancellation signal. That's just how it is.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.