I was able to "fix" the problem with the solutions posted here:
I had several problems in my code. The main problem was that I didn't need to wait. I could have just returned a future like in the examples below. The other problem (the reason I didn't do it like the examples below on the first place) was that, when I used something similar to the code in the examples below, I was boxing the entire match(){}... and that returned a compilation error because it was expecting a FutureResult and found a future::AndThen.
Moving the Box::new() inside the "match" fixed the problem I had while trying to return a future (something I tried and I couldn't get to compile)
The thing is... I don't really understand how moving the Box::new() inside the match code changes the end result so it compiles. So... I'm still looking for anyone to explain how that code change compiles in one way and not the other.
Thank you!
Here is the code for future reference:
With CpuPool
extern crate futures;
extern crate hyper;
extern crate env_logger;
#[macro_use] extern crate log;
extern crate tokio_core;
extern crate futures_cpupool;
use hyper::{Post, StatusCode};
use hyper::header::ContentLength;
use hyper::server::{Http, Service, Request, Response};
use futures::{future, Future, Stream};
use futures_cpupool::CpuPool;
#[derive(Clone)]
struct Echo {
thread_pool: CpuPool,
}
impl Service for Echo {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = futures::BoxFuture<Response, hyper::Error>;
fn call(&self, req: Request) -> Self::Future {
let (method, uri, _version, headers, body) = req.deconstruct();
match (method, uri.path()) {
(Post, "/echo") => {
let mut res = Response::new();
if let Some(len) = headers.get::<ContentLength>() {
debug!("len:{}", len);
res.headers_mut().set(len.clone());
}
self.thread_pool.spawn_fn(move || {
let v = body.collect().wait()?;
let value = v.iter().fold(vec![], |mut acc, chunk| {
acc.extend_from_slice(chunk.as_ref());
acc
});
debug!("value: {:?}", &value);
Ok(res.with_body(value))
}).boxed()
},
_ => future::ok(Response::new().with_status(StatusCode::NotFound)).boxed()
}
}
}
fn main() {
env_logger::init().unwrap();
let addr = "127.0.0.1:1337".parse().unwrap();
let thread_pool = CpuPool::new_num_cpus();
let server = Http::new().bind(&addr, move || Ok(Echo { thread_pool: thread_pool.clone() })).unwrap();
println!("Listening on http://{}", server.local_addr().unwrap());
server.run().unwrap();
}
Without CpuPool
extern crate env_logger;
#[macro_use] extern crate log;
extern crate futures;
extern crate hyper;
use hyper::{Post, StatusCode};
use hyper::header::ContentLength;
use hyper::server::{Http, Service, Request, Response};
use futures::{future, Future, Stream};
#[derive(Clone)]
struct Echo;
impl Service for Echo {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = futures::BoxFuture<Response, hyper::Error>;
fn call(&self, req: Request) -> Self::Future {
let (method, uri, _version, headers, body) = req.deconstruct();
match (method, uri.path()) {
(Post, "/echo") => {
let mut res = Response::new();
let vec;
if let Some(len) = headers.get::<ContentLength>() {
vec = Vec::with_capacity(**len as usize);
res.headers_mut().set(len.clone());
} else {
vec = vec![];
}
body.fold(vec, |mut acc, chunk| {
acc.extend_from_slice(chunk.as_ref());
Ok::<_, hyper::Error>(acc)
}).and_then(move |value| {
debug!("value: {:?}", &value);
Ok(res.with_body(value))
}).boxed()
},
_ => future::ok(Response::new().with_status(StatusCode::NotFound)).boxed()
}
}
}
fn main() {
env_logger::init().unwrap();
let addr = "127.0.0.1:1337".parse().unwrap();
let server = Http::new().bind(&addr, move || Ok(Echo)).unwrap();
println!("Listening on http://{}", server.local_addr().unwrap());
server.run().unwrap();
}