Reading json body in Hyper Service.call


#1

Hello, I’ve been trying to do this for a while. It’s a hobby. Not a real project, but I’m stuck. Right now I don’t even care that the code blocks until it reads the body, the problem is: it’s blocking forever!

impl Service for AlphaBravo {
...
type Future = Box<::futures::Future<Item = Self::Response, Error = Self::Error>>;
...
fn call(&self, req: Request) -> Self::Future {
...
    Box::new(match (method, uri.path()) {
            (Post, "/get-token") => {
            let full_body = body.map_err(|_| ()).fold(vec![], |mut acc, chunk| {
			    acc.extend_from_slice(&chunk);
			    Ok(acc)
			}).and_then(|v| String::from_utf8(v).map_err(|_| ())).wait().unwrap();
            let user_json: serde_json::Value = serde_json::from_str(&full_body).unwrap();
            let token = core_api::api_get_token(
                serde_json::to_string(&user_json["user"]).unwrap(),
                serde_json::to_string(&user_json["password"]).unwrap(),
            );
            futures::future::ok(Response::new().with_status(StatusCode::Created).with_body(
                token,
            ))
        }
...

It’s blocking when I do .wait() and it never returns
My testing data is:
curl -X POST -H “Content-Length: 34” -i http://127.0.0.1:8080/get-token -d’{“user”:“Mr1”,“password”:“123123”}’

Any ideas? Of course I obviously prefer non-blocking code, but I’m starting with Rust and Hyper, so even this code is turning out to be challenging.


#2

I was able to “fix” the problem with the solutions posted here:


I had several problems in my code. The main problem was that I didn’t need to wait. I could have just returned a future like in the examples below. The other problem (the reason I didn’t do it like the examples below on the first place) was that, when I used something similar to the code in the examples below, I was boxing the entire match(){}… and that returned a compilation error because it was expecting a FutureResult and found a future::AndThen.
Moving the Box::new() inside the “match” fixed the problem I had while trying to return a future (something I tried and I couldn’t get to compile)
The thing is… I don’t really understand how moving the Box::new() inside the match code changes the end result so it compiles. So… I’m still looking for anyone to explain how that code change compiles in one way and not the other.

Thank you!

Here is the code for future reference:
With CpuPool


extern crate futures;
extern crate hyper;
extern crate env_logger;
#[macro_use] extern crate log;
extern crate tokio_core;
extern crate futures_cpupool;

use hyper::{Post, StatusCode};
use hyper::header::ContentLength;
use hyper::server::{Http, Service, Request, Response};
use futures::{future, Future, Stream};
use futures_cpupool::CpuPool;

#[derive(Clone)]
struct Echo {
    thread_pool: CpuPool,
}

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = futures::BoxFuture<Response, hyper::Error>;

    fn call(&self, req: Request) -> Self::Future {
        let (method, uri, _version, headers, body) = req.deconstruct();
        match (method, uri.path()) {
            (Post, "/echo") => {
                let mut res = Response::new();
                if let Some(len) = headers.get::<ContentLength>() {
                    debug!("len:{}", len);
                    res.headers_mut().set(len.clone());
                }
                self.thread_pool.spawn_fn(move || {
                    let v = body.collect().wait()?;
                    let value = v.iter().fold(vec![], |mut acc, chunk| {
                        acc.extend_from_slice(chunk.as_ref());
                        acc
                    });
                    debug!("value: {:?}", &value);
                    Ok(res.with_body(value))
                }).boxed()
            },
            _ => future::ok(Response::new().with_status(StatusCode::NotFound)).boxed()
        }
    }
}

fn main() {
    env_logger::init().unwrap();
    let addr = "127.0.0.1:1337".parse().unwrap();
    let thread_pool = CpuPool::new_num_cpus();
    let server = Http::new().bind(&addr, move || Ok(Echo { thread_pool: thread_pool.clone() })).unwrap();
    println!("Listening on http://{}", server.local_addr().unwrap());
    server.run().unwrap();
}

Without CpuPool


extern crate env_logger;
#[macro_use] extern crate log;
extern crate futures;
extern crate hyper;

use hyper::{Post, StatusCode};
use hyper::header::ContentLength;
use hyper::server::{Http, Service, Request, Response};
use futures::{future, Future, Stream};

#[derive(Clone)]
struct Echo;

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = futures::BoxFuture<Response, hyper::Error>;

    fn call(&self, req: Request) -> Self::Future {
        let (method, uri, _version, headers, body) = req.deconstruct();
        match (method, uri.path()) {
            (Post, "/echo") => {
                let mut res = Response::new();
                let vec;
                if let Some(len) = headers.get::<ContentLength>() {
                    vec = Vec::with_capacity(**len as usize);
                    res.headers_mut().set(len.clone());
                } else {
                    vec = vec![];
                }
                body.fold(vec, |mut acc, chunk| {
                    acc.extend_from_slice(chunk.as_ref());
                    Ok::<_, hyper::Error>(acc)
                }).and_then(move |value| {
                    debug!("value: {:?}", &value);
                    Ok(res.with_body(value))
                }).boxed()
            },
            _ => future::ok(Response::new().with_status(StatusCode::NotFound)).boxed()
        }
    }
}

fn main() {
    env_logger::init().unwrap();
    let addr = "127.0.0.1:1337".parse().unwrap();
    let server = Http::new().bind(&addr, move || Ok(Echo)).unwrap();
    println!("Listening on http://{}", server.local_addr().unwrap());
    server.run().unwrap();
}