How can I forward the stream of a Hyper client response to a Hyper sever response? (Proxy)

Hi,

I'm trying to write an HTTP proxy with Hyper (0.11 new tokio version). When a request comes in a request to an upstream server is triggered with client.get() and the response should be forwarded as server response. Here is what I have so far:

extern crate hyper;
extern crate futures;
extern crate tokio_core;

use hyper::header::ContentLength;
use hyper::Client;
use hyper::server::{Http, Request, Response, Service};
use hyper::header::Host;
use std::error::Error;
use std::io::{self, Write};
use futures::{Future, Stream};
use tokio_core::reactor::Core;

struct Proxy {
    upstream_port: u16,
}

impl Service for Proxy {
    // boilerplate hooking up hyper's server types
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    // The future representing the eventual Response your call will
    // resolve to. This can change to whatever Future you need.
    type Future = futures::future::FutureResult<Self::Response, Self::Error>;

    fn call(&self, request: Request) -> Self::Future {
        let response = pipe_through(request, self.upstream_port);
        futures::future::ok(response)
    }
}

pub fn start_server(port: u16, upstream_port: u16) {
    let address = "127.0.0.1:".to_owned() + &port.to_string();
    println!("Listening on {}", address);
    let addr = address.parse().unwrap();
    let server = Http::new()
        .bind(&addr, move || Ok(Proxy { upstream_port }))
        .unwrap();
    server.run().unwrap();
}

fn pipe_through(request: Request, upstream_port: u16) -> Response {
    let mut response: Response = Response::new();

    let mut core = Core::new().unwrap();
    let client = Client::new(&core.handle());

    let uri = "http://drupal-8.localhost".parse().unwrap();
    let work = client
        .get(uri)
        .and_then(|upstream_response| {
            println!("Response: {}", upstream_response.status());

            upstream_response
                .body()
                .for_each(|chunk| {
                              response
                                  .write_all(&chunk)
                                  .map(|_| ())
                                  .map_err(From::from)
                          })
        });
    core.run(work).unwrap();

    response
}

fn main() {
    let port: u16 = 9090;
    let upstream_port: u16 = 80;
    let _listening = rustnish::start_server(port, upstream_port);
}

This of course fails to compile with "no method named write_all found for type hyper::Response". How can I write chunks of data continuously to a server response object? Ideally I would stream through the client response, but I can't find the right methods I should use.

Thanks!

hyper::Body is a futures::Stream, so you should be able to work with that directly, and skip the for_each processing. I've not tested this example, and the Hyper API may not work this way.

But hyper::Body docs[1] says Body is used in requests and responses, although its API doesn't provide a direct means to write data into it. Nor does it provide an explicit means to tie a client response Body to a server response Body. I'm assuming it's as easy as my example shows.

[1]: hyper::Body - Rust

fn pipe_through(request: Request, upstream_port: u16) -> Response {
    let response: Response = Response::new();

    let mut core = Core::new().unwrap();
    let client = Client::new(&core.handle());

    let uri = "http://drupal-8.localhost".parse().unwrap();
    let work = client
        .get(uri)
        .map_err(Error::from)           // if `get()` fails, wrap the error.
        .map(|upstream_response| {   // switched to `map`, there's no blocking here
            println!("Response: {}", upstream_response.status());

            let client_response_body = upstream_response.body();

            // connect the upstream body to the downstream response.
            // btw, we just pulled response into this closure.  we'll need
            // to get it out somehow.
            let server_response = response.with_body(client_response_body);

            // return response so core will give it back outside of closure
            server_response 
        });

    // response returned by core
    // if `work` fails, what kind of response do you provide?
    match core.run(work) {
        Ok(response) => response,    // I think `server::Response` will
                                     // handle streaming the body when
                                     // its sent.
        Err(e) => {
            // generate error response to send downstream
        }
    }
}

Thanks a lot, I could get this working in an even simpler version:

fn pipe_through(request: Request, upstream_port: u16) -> Response {
    let mut core = Core::new().unwrap();
    let client = Client::new(&core.handle());

    let uri = "http://drupal-8.localhost".parse().unwrap();

    let work = client.get(uri);

    match core.run(work) {
        // Directly pass through the client response.
        Ok(response) => response,
        Err(_) => {
            let mut response = Response::new();
            response.set_status(StatusCode::InternalServerError);
            response.set_body("error");

            response
        }
    }
}

Turns out a client response is compatible to a server respone and I can just return it 1:1! Now that is the exact smoothness from a good HTTP framework, yay Hyper!

Now I wonder if this could be even further simplified. I'm not interested in the tokio event loop work, I can only act anyway when my single upstream response is available anyway. So ideally there should be a FutureResult::just_do_the_work_i_will_wait_here_for_you() method and I should not need to set up tokio_core::reactor::Core.

1 Like

Note that this code may block your main thread, and your proxy will only handle one request at a time.

The code below is as far as I got with a rewrite of your example to facilitate zero blocking. I'm having trouble with trait std::marker::Send is not implemented.

If that trait error can be sorted out, you might be able to reuse one Client instance on the Proxy instead of creating new Client's with the Proxy.handle.

extern crate hyper;
extern crate futures;
extern crate tokio_core;

use hyper::header::ContentLength;
use hyper::{Body, Client, StatusCode};
use hyper::client::HttpConnector;
use hyper::server::{Http, Request, Response, Service, Server};
use hyper::header::Host;
use std::error::Error;
use std::io::{self, Write};
use std::net::SocketAddr;
use std::time::Duration;
use futures::{BoxFuture, Future, Stream};
use tokio_core::reactor::{Core, Handle};
use tokio_core::net::TcpListener;

struct Proxy {
    upstream_port: u16,
    // reuse server's Core handle
    handle: Handle,
}

impl Service for Proxy {
    // boilerplate hooking up hyper's server types
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    // The future representing the eventual Response your call will
    // resolve to. This can change to whatever Future you need.
    type Future = BoxFuture<Self::Response, Self::Error>;

    fn call(&self, request: Request) -> Self::Future {
        let response = pipe_through(request, &self.handle, self.upstream_port);
        response
    }
}

pub fn start_server(port: u16, upstream_port: u16) {
    let address = "127.0.0.1:".to_owned() + &port.to_string();
    println!("Listening on {}", address);
    let addr = address.parse().unwrap();

    // Core handle is created by Server, but Proto had to be created first and it needs that
    // handle.  copied code from Hyper to create Proto and Server at same time so they use same
    // Core handle.
    let server = bind_service(Http::new(), &addr, upstream_port).unwrap();
    server.run().unwrap();
}

// taken from Hyper source : https://github.com/hyperium/hyper/blob/master/src/server/mod.rs#L88
pub fn bind_service<Bd>(
    http: Http,
    addr: &SocketAddr,
    upstream_port: u16,
) -> hyper::error::Result<Server<Proxy, Bd>>
where
    Bd: Stream<Item = hyper::Chunk, Error = hyper::Error>,
{
    let core = try!(Core::new());
    let handle = core.handle();
    let listener = try!(TcpListener::bind(addr, &handle));

    Ok(Server {
        new_service: Proxy {
            upstream_port,
            handle,
        },
        core: core,
        listener: listener,
        protocol: http,
        shutdown_timeout: Duration::new(1, 0),
    })
}

fn pipe_through(
    request: Request,
    handle: &Handle,
    upstream_port: u16,
) -> BoxFuture<Response, hyper::Error> {
    let mut response: Response = Response::new();

    let client = Client::new(handle);

    let uri = "http://drupal-8.localhost".parse().unwrap();
    let work = client.get(uri).or_else(|_| {
        let mut response = Response::new();
        response.set_status(StatusCode::InternalServerError);
        response.set_body("error");

        futures::future::ok(response)
    });

    // don't block waiting for work to complete, let the server Core trickle through the chain of
    // futures in a non-blocking fashion.
    work.boxed()
}

fn main() {
    let port: u16 = 9090;
    let upstream_port: u16 = 80;
    let _listening = start_server(port, upstream_port);
}

After screwing with this a bit too much, I'm of the opinion that the solution isn't easily found if one exists at all.

The previous code example will never work, because Hyper doesn't expose enough of its API to create Server instances. That realization was hidden behind the multitude of Rust compiler errors I've dug through just to hit that dead end.

Your options at this point are pretty slim.

  1. Figure out how to use tokio_core::reactor::Core for the hyper::client::Client without blocking. You might ask on gitter.im/tokio-rs/tokio. I think spawning a new thread with it's own Core and synchronizing over a oneshot::channel is probably your only recourse here.
  2. Ask hyper devs how to build a Service that encapsulates a Client that shares a Handle with the Server.
  3. Fork hyper, modify src/server/mod.rs, add a function--bind_with_core()- - for Http, then my bind_service function could create the Server with that.

Maybe someone else can offer more options.

2 Likes

A related Hyper issue, about hyper accepting a premade Core: https://github.com/hyperium/hyper/issues/1075

1 Like

It's actually even easier, since Client is a Service, and a server needs a Service:

let mut core = Core::new()?;
let handle = core.handle();
let client = Client::new(&handle);
let http = Http::new();

let listener = TcpListener::bind(&addr, &handle);

let serve = listener.incoming().for_each(move |(sock, addr)| {
    http.bind_connection(&handle, sock, addr, client.clone());
    Ok(())
});

core.run(server)?;

This is a super naïve proxy. It doesn't correctly deal with headers the way proxies should, and it doesn't work with "regular" requests, only those made specifically for proxies (so, GET http://httpbin.org/ip will work just fine, GET /ip will error as the Client tells you it doesn't know how to connect with just the path /ip).


As to making use of an existing Core, we've heard that it's annoying. In my opinion only, it's one of those things that once you learn how to it, it's pretty easy to do it when you need to. For that reason, I have been spending my time in other areas of hyper. Still, again, we know it's initially annoying.

This is just wild speculation here, I haven't planned this out with Tokio yet, but if you look at the PRs to the futures and tokio-core crates, there may be movement towards being generic over an Executor, and possibly a way to store and grab "the" thread executor. Then maybe it's easy to do Client::new(executor::current()) or something.

1 Like

Thanks, that is good info! Even if I want to just pass through the response for now I will want to modify it in a future iteration of my proxy. I have come this far:

extern crate hyper;
extern crate futures;
extern crate tokio_core;

use hyper::Client;
use hyper::server::{Http, Request, Response, Service};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use hyper::StatusCode;
use futures::Future;
use futures::Stream;
use hyper::client::HttpConnector;

struct Proxy {
    upstream_port: u16,
    client: Client<HttpConnector>,
}

impl Service for Proxy {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = futures::future::FutureResult<Self::Response, Self::Error>;

    fn call(&self, request: Request) -> Self::Future {
        let uri = "http://drupal-8.localhost".parse().unwrap();

        self.client.get(uri)
    }
}

pub fn start_server(port: u16, upstream_port: u16) {
    let address = "127.0.0.1:".to_owned() + &port.to_string();
    println!("Listening on {}", address);
    let addr = address.parse().unwrap();

    // Prepare a Tokio core that we will use for our server and our client.
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let http = Http::new();
    let listener = TcpListener::bind(&addr, &handle).unwrap();
    let client = Client::new(&handle);

    let server = listener
        .incoming()
        .for_each(move |(sock, addr)| {
            http.bind_connection(&handle,
                                 sock,
                                 addr,
                                 Proxy {
                                     upstream_port: upstream_port,
                                     client: client.clone(),
                                 });
            Ok(())
        });

    core.run(server).unwrap();
}

That does not compile:

error[E0308]: mismatched types
  --> src/lib.rs:28:9
   |
28 |         self.client.get(uri)
   |         ^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `hyper::client::FutureResponse`
   |
   = note: expected type `futures::FutureResult<hyper::Response, hyper::Error>`
              found type `hyper::client::FutureResponse`

So now the challenge is to convert a hyper::client::FutureResponse into a futures::FutureResult<hyper::Response, hyper::Error>, which of course I have no idea how to do :smiley:

You don't need to convert them (in fact, you won't be able to). The FutureResult is basically std::result::Result that implements Future. In other words, it means you have a value that is a ready future, right now. But you don't have that, since you won't know how to respond until the proxied request finishes.

So, you can change the type Future in your Service to the correct type you need.

1 Like

Ah right - that works!

But I also want to return direct responses if the request was invalid for example, so I need to unify 2 different kind of futures as one return type. As a Rust newbie I tried with an enum:

enum WeirdResponseFutures {
    Client_Response(FutureResponse),
    Request_Error(futures::future::FutureResult<Response, hyper::error::Error>),
}

struct Proxy {
    upstream_port: u16,
    client: Client<HttpConnector>,
}

impl Service for Proxy {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = WeirdResponseFutures;

    fn call(&self, request: Request) -> Self::Future {
        let host = match request.headers().get::<Host>() {
            None => {
                return futures::future::ok(Response::new()
                                               .with_status(StatusCode::BadRequest)
                                               .with_body("No host header in request"));

            }
            Some(h) => h,
        };
        let request_uri = request.uri();
        let upstream_uri = ("http://".to_string() + &host.to_string() + request_uri.path())
            .parse()
            .unwrap();

        self.client.get(upstream_uri)
    }
}

So of course the compiler does not like that because my enum does not implement the Future trait. How can I tell the compiler that my enum can only contain futures anyway, so regardless what the eventual return value is it will be a super good future?

Or can I invent an artificial future type on my own that wraps 2 different kind of futures?

And of course while I'm typing this up I check the docs one more time and find Echo, echo, echo | hyper which has the "Either" concept at the end which is probably what I'm looking for :slight_smile:
Posting this anyway for next time I look this up again.

1 Like

You definitely can do that with WeirdResponseFutures. You just need to implement Future for that type, and it should all work. In cases where you only have 2 futures to pick from, the futures::future::Either type is also a possibility.

1 Like

Thanks, futures::future::Either does the trick for me :slightly_smiling_face:

I think I got this working now as it should, server code: https://github.com/klausi/rustnish/blob/goal-02/src/lib.rs

The integration test was a bit annoying because of all the boilerplate: https://github.com/klausi/rustnish/blob/goal-02/tests/integration_tests.rs

My least favorite line from that is:

assert_eq!(Ok("hello"), str::from_utf8(&response.body().concat2().wait().unwrap()));

The amount of thinking I have to do in writing test cases to just read a response body is a bit too much for my taste :stuck_out_tongue:

Having some trouble defining Future types again, posted a follow-up in How do you calculate the Future types of a Hyper Tokio service?

@seanmonstar the client.clone() call you suggested is causing a memory leak for me now, not sure if I'm doing something wrong or Hyper. Reported at https://github.com/hyperium/hyper/issues/1315