How can i implement timeout middleware with hyper

I would like to implement Timeout middleware, but I couldn't implement it with reference to the link below. It's implementation doesn't work well.

I was able to implement Logger middleware by myself. I would like to implement Timeout middleware as well. Please teach.

use hyper::server::{Http, Request, Response, Service};
use std::fmt;

#[derive(Clone)]
pub struct Log<S> {
    upstream: S,
}

impl<S> Log<S> {
    pub fn new(upstream: S) -> Log<S> {
        Log { upstream: upstream }
    }
}

impl<S> Service for Log<S>
    where S: Service,
          S::Request: fmt::Debug,
{
    type Request = S::Request;
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn call(&self, request: Self::Request) -> Self::Future {
        println!("[REQUEST] {:?}", request);
        self.upstream.call(request)
    }
}
1 Like

You can probably make the timeout middleware example work by using tokio_core::reactor::Timeout - Rust.

To use it, you need access to a Handle - this means you need to start hyper slightly differently than normal: Poll for incoming events and drive futures on the same thread? - #11 by vitalyd. Note the change to replace bind_connection with serve_connection mentioned at the end of the thread.

So you’d clone a Handle and give it to your NewService impl. It will in turn hand out a clone of it to every Service instance it creates. You should then be able to use Timeout in the call method to select between it and the upstream call.

1 Like

Thank a lot for your reply.

You can probably make the timeout middleware example work by using tokio_core::reactor::Timeout - Rust 2.

You mean the Timeout middleware example alomost work well, and I should use NOT tokio::timer::Timer BUT tokio_core::reactor::Timeout, right?
I know that tokio-core crate is scheduled for deprecation in favor of tokio, though...

To use it, you need access to a Handle - this means you need to start hyper slightly differently than normal: Poll for incoming events and drive futures on the same thread? 3. Note the change to replace bind_connection with serve_connection mentioned at the end of the thread.

ok, i got it.

So you’d clone a Handle and give it to your NewService impl. It will in turn hand out a clone of it to every Service instance it creates. You should then be able to use Timeout in the call method to select between it and the upstream call.

I couldnt catch up. NewService impl means Log<S>, or Timeout<T> for example, right?
The select you mentioned is futures::future::Select, right?

Do you have any code example or code snippet for Timeout middleware, please.

It will in turn hand out a clone of it to every Service instance it creates

Is threre any side effects or any other problem caused?

Here's a rough example I whipped up to give you an idea:

extern crate futures;
extern crate hyper;
extern crate tokio_core;

use futures::future::Either;
use futures::prelude::*;
use futures::Future;
use hyper::server::Service;
use std::time::Duration;
use tokio_core::reactor::{Handle, Timeout};

struct RealService(Handle);

impl Service for RealService {
    type Request = hyper::Request;
    type Response = hyper::Response;
    type Error = hyper::Error;
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    fn call(&self, _req: Self::Request) -> Self::Future {
        // Pretend our response takes longer than `TimeoutMiddleware` waits.
        let timeout = Timeout::new(Duration::from_secs(4), &self.0).unwrap();
        let timeout = timeout.map_err(|_| unreachable!()).and_then(|_| {
            let mut resp = hyper::Response::new();
            resp.set_status(hyper::StatusCode::Ok);
            resp.set_body("some text");
            Ok(resp)
        });
        Box::new(timeout)
    }
}

pub struct TimeoutMiddleware<T> {
    upstream: T,
    delay: Duration,
    handle: Handle,
}

impl<T> TimeoutMiddleware<T> {
    pub fn new(upstream: T, handle: Handle, delay: Duration) -> Self {
        Self {
            upstream,
            handle,
            delay,
        }
    }
}

impl<T> Service for TimeoutMiddleware<T>
where
    T: Service,
    T::Response: Into<hyper::Response> + 'static,
    T::Future: 'static,
    T::Error: Into<hyper::Error>,
{
    type Request = T::Request;
    type Response = hyper::Response;
    type Error = hyper::Error;
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    fn call(&self, req: Self::Request) -> Self::Future {
        let timeout = Timeout::new(self.delay, &self.handle).unwrap();
        Box::new(
            self.upstream
                .call(req)
                .select2(timeout)
                .then(|res| match res {
                    // upstream returned a value in time
                    Ok(Either::A((x, _))) => Ok(x.into()),
                    // upstream timed out - return a timed out response
                    Ok(Either::B((_, _))) => {
                        let mut resp = hyper::Response::new();
                        resp.set_status(hyper::StatusCode::RequestTimeout);
                        Ok(resp)
                    }
                    // upstream errored - propagate the error
                    Err(Either::A((e, _))) => Err(e.into()),
                    // Timeout never errors!
                    _ => unreachable!(),
                }),
        )
    }
}

fn main() {
    let addr = "127.0.0.1:8080".parse().unwrap();
    let mut core = tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    let listener = tokio_core::net::TcpListener::bind(&addr, &handle).unwrap();
    let server = listener.incoming().for_each(|(conn, _)| {
        hyper::server::Http::<hyper::Chunk>::new()
            .serve_connection(
                conn,
                TimeoutMiddleware::new(
                    RealService(handle.clone()),
                    handle.clone(),
                    Duration::from_secs(1),
                ),
            )
            .map_err(|e| {
                std::io::Error::new(
                    std::io::ErrorKind::Other,
                    format!("Failed to serve connection: {}", e),
                )
            })
    });
    core.run(server).unwrap();
}

Let me know if you have questions.

1 Like

It's very kind of you. I tried out the example, and it works fine. But it's not exactly what I wanted.

I removed the Timeout in RealService.call method, and added the function call like below instead, which take very long time. But it doesn't timeout.

fn heavy_workload() {
    for i in 0..1000000 {
        println!("{}", i);
    }
}

Is this because of hyper and future crate's single thread feature? I'm familiar with Scala's future, but I don't clearly understand how Rust's future works. Do I need to use https://crates.io/crates/futures-cpupool?

Some more help, please.


P.S.

I tried out the code like below using futures_cpupool, and I was able to get the result I was looking for. While heavy_workload was running, a timeout occurred and 408 Request Timeout could be obtained.

But there is another problem. Heavy_workload processed on another thread will still be processed without being aborted.

use futures_cpupool::{CpuPool};

struct RealService(Handle, CpuPool);

impl Service for RealService {
    type Request = hyper::Request;
    type Response = hyper::Response;
    type Error = hyper::Error;
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    fn call(&self, _req: Self::Request) -> Self::Future {
                let pool = self.1.clone();
                let future = pool.spawn_fn(|| {
                    heavy_workload();
                    Ok(hyper::Response::new())
                });

I want to stop processing for HTTP requests that have exceeded a certain time (for example 100 ms) and return 408 Request Timeout to the client. What is the proper way?

It’s actually due to tokio (0.1)'s singlethread nature. There’s a single thread running the event loop (represented by the Core struct). A future that hogs the cpu, like your example, will prevent the loop from processing other futures.

Futures in Rust don’t run themselves - they need an executor (such as tokio or cpupool) to do that. I suggest reading the tokio docs to get an intro to how futures are executed, particularly when run on top of tokio.

The future representing the request handling will be dropped if the timeout future completes first. So whenever that expensive work is done it’ll essentially be ignored. But I suppose you actually want to preempt the expensive work as soon as you know it’s timed out? There are a couple of ways I can think of offhand.

Instead of running the expensive loop to completion, yield control back to the executor. So your loop is put into some struct that implements Future. Every so often, yield back to the executor by returning Async::NotReady. If the CpuFuture representing this work is dropped in the meantime, it’ll set a flag that it's been dropped. It will then not execute the future again.

The second option would be a more explicit sharing of a flag that your loop checks. This flag can be, e.g., an AtomicBool that your loop checks periodically, and then is set by your code when the request times out.

1 Like

It was very helpful. Thank you very much.