Hi, guys! My service based on Hyper framework and i just wanna add async saving for request body in redis, for this purpose i'm using lib. There is some examples of framework using example. The problem starts when i'm trying to end future chain with another future chain fn data_saving.
Questions:
- Should I just divide data_saving chain? if yes, how can i do it? When I'm trying to divide it i facing with type completion problems. First i'm trying to execute database connection future and can't describe returning type, because of PairedConnectionBox is lib private type and there is no way to use it.
- Or i can execute future -> future -> chain future -> future, but i don't know how
- Maybe here is another way of using.
extern crate hyper;
extern crate futures;
#[macro_use]
extern crate serde_json;
extern crate queryst;
extern crate url;
#[macro_use]
extern crate base64;
extern crate tokio;
#[macro_use]
extern crate redis_async;
use futures::future;
use std::{thread, time};
use hyper::header::ContentType;
use hyper::server::{Http, Request, Response, Service};
use hyper::{Method, StatusCode};
use futures::future::{Future, FutureResult};
use futures::Stream;
use hyper::{Body, Chunk};
use url::form_urlencoded;
use std::collections::HashMap;
use queryst::parse;
use std::io::{self, copy, Read};
use std::env;
use std::str;
use std::error::Error;
use redis_async::client;
fn parse_json(form_chunk: Chunk) -> FutureResult<serde_json::Value, hyper::Error> {
if let Ok(j) = serde_json::from_slice(form_chunk.as_ref()) {
futures::future::ok(j)
} else {
futures::future::err(hyper::Error::from(io::Error::new(
io::ErrorKind::InvalidInput,
"Json is invalid",
)))
}
}
fn data_saving(data: serde_json::Value) -> FutureResult<hyper::Response, hyper::Error> {
let addr = env::args()
.nth(1)
.unwrap_or("127.0.0.1:6379".to_string())
.parse()
.unwrap();
let db_connection = client::paired_connect(&addr);
db_connection.and_then(|connection| {
connection.send::<redis_async::resp::RespValue>(resp_array!["SET", "key", "data"])
}).then(|result| match result {
Ok(result) => {
futures::future::ok(Response::new().with_status(StatusCode::InternalServerError))
}
Err(e) => {
println!("Unexpected error: {:?}", e);
futures::future::ok(Response::new().with_status(StatusCode::InternalServerError))
}
})
}
struct Echo;
impl Service for Echo {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;
fn call(&self, req: Request) -> Self::Future {
match (req.method(), req.path()) {
(&Method::Post, "/post_data") => {
let future = req
.body()
.concat2()
.and_then(parse_json)
.and_then(move |json| data_saving(json));
Box::new(future)
}
_ => {
Box::new(futures::future::ok(
Response::new().with_status(StatusCode::NotFound)
))
}
}
}
}
fn main() {
let addr = "127.0.0.1:3000".parse().unwrap();
println!("Server is running at {:?}", addr);
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
server.run().unwrap();
}