Future chaining


#1

Hi, guys! My service based on Hyper framework and i just wanna add async saving for request body in redis, for this purpose i’m using lib. There is some examples of framework using example. The problem starts when i’m trying to end future chain with another future chain fn data_saving.

Questions:

  1. Should I just divide data_saving chain? if yes, how can i do it? When I’m trying to divide it i facing with type completion problems. First i’m trying to execute database connection future and can’t describe returning type, because of PairedConnectionBox is lib private type and there is no way to use it.
  2. Or i can execute future -> future -> chain future -> future, but i don’t know how :frowning:
  3. Maybe here is another way of using.

extern crate hyper;
extern crate futures;
#[macro_use]
extern crate serde_json;
extern crate queryst;
extern crate url;
#[macro_use]
extern crate base64;
extern crate tokio;
#[macro_use]
extern crate redis_async;

use futures::future;
use std::{thread, time};
use hyper::header::ContentType;
use hyper::server::{Http, Request, Response, Service};
use hyper::{Method, StatusCode};
use futures::future::{Future, FutureResult};
use futures::Stream;
use hyper::{Body, Chunk};
use url::form_urlencoded;
use std::collections::HashMap;
use queryst::parse;
use std::io::{self, copy, Read};
use std::env;
use std::str;
use std::error::Error;
use redis_async::client;


fn parse_json(form_chunk: Chunk) -> FutureResult<serde_json::Value, hyper::Error> {
    if let Ok(j) = serde_json::from_slice(form_chunk.as_ref()) {
        futures::future::ok(j)
    } else {
        futures::future::err(hyper::Error::from(io::Error::new(
            io::ErrorKind::InvalidInput,
            "Json is invalid",
        )))
    }
}

fn data_saving(data: serde_json::Value) -> FutureResult<hyper::Response, hyper::Error> {
    let addr = env::args()
        .nth(1)
        .unwrap_or("127.0.0.1:6379".to_string())
        .parse()
        .unwrap();
    let db_connection = client::paired_connect(&addr);
    db_connection.and_then(|connection| {
        connection.send::<redis_async::resp::RespValue>(resp_array!["SET", "key", "data"])
    }).then(|result| match result {
        Ok(result) => {
            futures::future::ok(Response::new().with_status(StatusCode::InternalServerError))
        }
        Err(e) => {
            println!("Unexpected error: {:?}", e);
            futures::future::ok(Response::new().with_status(StatusCode::InternalServerError))
        }
    })
}

struct Echo;

impl Service for Echo {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;

    fn call(&self, req: Request) -> Self::Future {

        match (req.method(), req.path()) {
            (&Method::Post, "/post_data") => {
                let future = req
                    .body()
                    .concat2()
                    .and_then(parse_json)
                    .and_then(move |json| data_saving(json));
                Box::new(future)
            }
            _ => {
                Box::new(futures::future::ok(
                    Response::new().with_status(StatusCode::NotFound)
                ))
            }
        }
    }
}


fn main() {
    let addr = "127.0.0.1:3000".parse().unwrap();
    println!("Server is running at {:?}", addr);
    let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
    server.run().unwrap();
}

#2

You can try

fn data_saving(data: serde_json::Value) -> Box<Future<Item=hyper::Response, Error=hyper::Error>> {
    let addr = env::args()
        .nth(1)
        .unwrap_or("127.0.0.1:6379".to_string())
        .parse()
        .unwrap();
    let db_connection = client::paired_connect(&addr);
    Box::new(db_connection.and_then(|connection| {
        connection.send::<redis_async::resp::RespValue>(resp_array!["SET", "key", "data"])
    }).then(|result| match result {
        Ok(result) => {
            futures::future::ok(Response::new().with_status(StatusCode::InternalServerError))
        }
        Err(e) => {
            println!("Unexpected error: {:?}", e);
            futures::future::ok(Response::new().with_status(StatusCode::InternalServerError))
        }
    }))
}

Or:

fn data_saving(data: serde_json::Value) -> impl Future<Item=hyper::Response, Error=hyper::Error> {
    let addr = env::args()
        .nth(1)
        .unwrap_or("127.0.0.1:6379".to_string())
        .parse()
        .unwrap();
    let db_connection = client::paired_connect(&addr);
    db_connection.and_then(|connection| {
        connection.send::<redis_async::resp::RespValue>(resp_array!["SET", "key", "data"])
    }).then(|result| match result {
        Ok(result) => {
            futures::future::ok(Response::new().with_status(StatusCode::InternalServerError))
        }
        Err(e) => {
            println!("Unexpected error: {:?}", e);
            futures::future::ok(Response::new().with_status(StatusCode::InternalServerError))
        }
    })
}

Futures chaining try to return impl Future but failed
Futures chaining try to return impl Future but failed
#3

Absolutely right and it’s work great! Thank you so much, you save me from prolonged depression.
Could you please explain, why it works? Box::new just allocate memory in heap and give me opportunity to refer future chain?? or if it’s possible some explanation topic, I try to understand futures already 4-5 days.


#4

Box<Future<...>> is (one form of) a trait object, which has the effect of erasing the underlying type (which you weren’t able to name in this case because it’s private). There’s more info on trait objects in the Rust book. But if you want me to elaborate, let me know - keeping it short to not bore you and others.

Box<Future<...>> does allocate on the heap (and also involves virtual dispatch), so it has a cost. That’s a big reason the impl Trait feature was added to Rust 1.26, which is the 2nd version I gave you above. If you can, you should use that instead of a boxed future.


#5

Strange, I referred to this post and implemented my code in the same way to return impl Future (not exactly the same), but compile said:
expected struct futures::FutureResult, found struct futures::Then


#6

It would help us to help you if you paste the code you have. You should also post a new thread rather than reopening this old one; if you think there’s value in seeing this thread as well, you can simply refer/link to it from your new post.