How to use hyper, help me


#1

I use hyper 0.11.2, look at the demo.

struct HelloWorld {
}

impl HelloWorld {

    pub fn new() -> Self {
        HelloWorld {
        }
    }
}

impl Service for HelloWorld {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = BoxFuture<Response, hyper::Error>;

    fn call(&self, req: Request) -> Self::Future {
        info!("{}", req.method());
        info!("{}", req.headers());
        info!("{}", req.path());
        let body = req.body();
        let future = body.concat2().map(|chunk| {
            let content = String::from_utf8(chunk.to_vec());
            content.unwrap_or("".to_string())
        }).map(move |content| {
            let back = "hello world".to_string();
            //do something long time, this will block all other new requests, my god. what should i do.
            //Is hyper only has one worker thread?
            use std::thread;
            use std::time::Duration;
            thread::sleep(Duration::from_secs(60));
            let res = Response::new().with_body(back);
            res
        });
        return future.boxed();
    }
}

struct MyNewService {
}

impl MyNewService {

    pub fn new() -> Self {
        MyNewService {
        }
    }
}

impl NewService for MyNewService {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Instance = HelloWorld;

    fn new_service(&self) -> Result<Self::Instance, std::io::Error> {
        return Ok(HelloWorld::new());
    }
}

fn main() {
    let _ = init_base_log();
    let addr = "127.0.0.1:3000".parse().unwrap();
    let server = Http::new().keep_alive(true).bind(&addr, MyNewService::new()).unwrap();
    server.run().unwrap();
}

the sleep will block the woker thread, and the other new request will wait util the sleep ends. What i need is that, the sleep not blocks the other new reuqests. all requests should block at the sleep.
when two requests hit one by one.
the output

[2017-09-13 12:47:51.490748700] [INFO] [hyper_new] /
[2017-09-13 12:48:51.459604000] [INFO] [hyper::http::request] Request::new: addr=127.0.0.1:51898, "GET / HTTP/1.1"
[2017-09-13 12:48:51.459604000] [INFO] [hyper_new] GET
[2017-09-13 12:48:51.459604000] [INFO] [hyper_new] Host: localhost:3000
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:55.0) Gecko/20100101 Firefox/55.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3
Accept-Encoding: gzip, deflate
Connection: keep-alive
Upgrade-Insecure-Requests: 1

[2017-09-13 12:48:51.459604000] [INFO] [hyper_new] /
[2017-09-13 12:49:51.486839400] [INFO] [hyper::http::request] Request::new: addr=127.0.0.1:51892, "GET /favicon.ico HTTP/1.1"
[2017-09-13 12:49:51.486839400] [INFO] [hyper_new] GET
[2017-09-13 12:49:51.486839400] [INFO] [hyper_new] Host: localhost:3000
Connection: keep-alive
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36
Accept: image/webp,image/apng,image/*,*/*;q=0.8
Referer: http://localhost:3000/
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.8

the second request is almost two seconds after the first one.
The second request header log should print a few seconds after the first, but the logs shows that, util the first request ends, hyper then start handle the second request.


#2

The issue here is that under the hood hyper's async services are all tracked by an event loop on a single thread. See tokio.rs for more details. It’s a big topic.

When you call thread::sleep you’re blocking that single thread tracking all requests and none of them can progress.

Instead, have a look at tokio-timer which will let you wait without blocking other requests.


#3

You can’t use the std synchronization primitives in combination with the asynchronous primitives from futures. Take a look at https://crates.io/crates/tokio-timer.


#4

All my knowledge from nodejs and c tells me that, async package uses the model that, a single thread handles the events loop and a thread pool handles the ture work. If i didn’t misunderstand you, you means that, tokio(or furtures) only has a thread handles the events but has not a thead pool and if i have a heavy work, I should create my own thread, and only send msg to tokio. Is this right? thanks a lot.


#5

Yes, that is pretty much it. Tokio does not distribute work on a thread pool by default, every future continuation runs on the event loop thread. If you want to dispatch to another thread, you will need to do so explicitly using a futures-aware scheduler like the futures-cpupool crate.

The rationale for this is that if there is only a little CPU work to do in the future continuation, as is common in IO-bound programs, dispatching to a thread pool causes scheduling and synchronization overhead and makes the programming model more cumbersome for no good reason.


#6

Thanks, i finally get the job done and understand the model of tokio.

impl Service for HelloWorld {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = BoxFuture<Response, hyper::Error>;

    fn call(&self, req: Request) -> Self::Future {
        info!("{}", req.method());
        info!("{}", req.headers());
        info!("{}", req.path());
        let body = req.body();
        let thread_pool = self.thread_pool.clone();
        let future = body.concat2().map(|chunk| {
            let content = String::from_utf8(chunk.to_vec());
            content.unwrap_or("".to_string())
        }).and_then(move |content|{
            let msg = thread_pool.spawn_fn(move || {
                use std::thread;
                use std::time::Duration;
                thread::sleep(Duration::from_secs(30));
                Ok("hello world".to_string())
            });
            msg
        }).map(move |back| {
            let res = Response::new().with_body(back);
            res
        });
        return future.boxed();
    }
}