How do you cache A Hyper 0.12 response?

I'm trying to write a reverse proxy that caches Hyper responses in memory. The proxy receives a request, forwards it to an upstream server, returns the response. Current code is at https://github.com/klausi/rustnish/blob/goal-10/src/lib.rs

Attempt 1: use LruCache::<String, Response<Body>> as in the code above. Compile error:

error[E0277]: `(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)` cannot be shared between threads safely
   --> src/lib.rs:169:13
    |
169 |             tokio::spawn(
    |             ^^^^^^^^^^^^ `(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)>`
    = note: required because it appears within the type `std::boxed::Box<(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)>`
    = note: required because it appears within the type `hyper::body::body::Kind`
    = note: required because it appears within the type `hyper::Body`
    = note: required because it appears within the type `hyper::Response<hyper::Body>`
...

Hm, seems like Response<Body> cannot be used directly in the cache. What data structure do I need to convert Response into so that it can be put into a cache?

Attempt 2: use Response<Vec<u8>> that replaces the Body stream with a static array of bytes that can be shared between threads.

Now I add a stub method for storing in the cache:

fn cache_store(&self, request: &Request<Body>, &response: Response<Vec<u8>>) -> bool {
    false
}

and invoke it:

self.cache_store(&request, &response);

Compile error:

error[E0308]: mismatched types
   --> src/lib.rs:124:48
    |
124 |                     self.cache_store(&request, &response);
    |                                                ^^^^^^^^^ expected struct `std::vec::Vec`, found struct `hyper::Body`
    |
    = note: expected type `&hyper::Response<std::vec::Vec<u8>>`
               found type `&hyper::Response<hyper::Body>`

Ah yes, a response does not automatically convert itself into a bytes body, how could I do that?

Response has a map() method to convert bodies.

let mapped_response: Response<Vec<u8>> =
    response.map(|b| b.concat2().wait().unwrap().as_bytes());

Compile error:

error[E0599]: no method named `as_bytes` found for type `hyper::Chunk` in the current scope
   --> src/lib.rs:121:70
    |
121 |                         response.map(|b| b.concat2().wait().unwrap().as_bytes());

How do you convert a Body into bytes?

Attempt 3: looks like hyper::Chunk has a method into_bytes(), let's try that:

let mapped_response: Response<Vec<u8>> =
                        response.map(|b| b.concat2().wait().unwrap().into_bytes());

Compile error:

error[E0308]: mismatched types
   --> src/lib.rs:121:25
    |
121 |                         response.map(|b| b.concat2().wait().unwrap().into_bytes());
    |                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `std::vec::Vec`, found struct `bytes::Bytes`
    |
    = note: expected type `hyper::Response<std::vec::Vec<u8>>`
               found type `hyper::Response<bytes::Bytes>`

Aha, there is Bytes type. Maybe I should use that instead of Vec<u8>?

There is also the to_vec() method, let's try that:

let mapped_response: Response<Vec<u8>> =
    response.map(|b| b.concat2().wait().unwrap().to_vec());

// Put the response into the cache.
self.cache_store(&request, &mapped_response);

But that brings back a similar compile error as in the OP:

error[E0277]: `(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)` cannot be shared between threads safely
   --> src/lib.rs:97:9
    |
97  | /         Box::new(self.client.request(request).then(|result| {
98  | |             let our_response = match result {
99  | |                 Ok(mut response) => {
100 | |                     let version = match response.version() {
...   |
140 | |             futures::future::ok(our_response)
141 | |         }))
    | |___________^ `(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)>`
    = note: required because it appears within the type `std::boxed::Box<(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)>`
    = note: required because it appears within the type `hyper::body::body::Kind`
    = note: required because it appears within the type `hyper::Body`
    = note: required because it appears within the type `hyper::Request<hyper::Body>`
...

Why is Body and Chunk still involved here? I just made a mapped response?

Let's try the usual desperate compiler fight solution and throw in a clone():

let mapped_response: Response<Vec<u8>> =
    response.map(|b| b.concat2().wait().unwrap().to_vec().clone());

// Put the response into the cache.
self.cache_store(&request, &mapped_response);

Does not help, same compile error as before. How do you tell the compiler that you want a clone of response and it should not worry about thread safety?

Attempt 4: do not use Response as data type, let's start with something simple like Bytes.

LruCache::<String, Vec<u8>>

Read the body:

let body_bytes = response.body().concat2().wait().unwrap();

Compile error:

error[E0507]: cannot move out of borrowed content
   --> src/lib.rs:124:42
    |
124 |                         let body_bytes = response.body().concat2().wait().unwrap();
    |                                          ^^^^^^^^^^^^^^^ cannot move out of borrowed content

Right, the response still retains ownership of the body. How can we make a copy of the body while leaving ownership with the response that we later return?

Let's try to consume the body, clone it, then assemble a new response that we can return.

let (parts, body) = response.into_parts();
let body_bytes = body.concat2().wait().unwrap().to_vec();

// Put the response into the cache.
self.cache_store(&request, body_bytes);

Response::from_parts(parts, Body::from(body_bytes.clone()))

That gives us the good old compile error we have seen a couple of times already:

error[E0277]: `(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)` cannot be shared between threads safely
   --> src/lib.rs:97:9
    |
97  | /         Box::new(self.client.request(request).then(|result| {
98  | |             let our_response = match result {
99  | |                 Ok(mut response) => {
100 | |                     let version = match response.version() {
...   |
146 | |             futures::future::ok(our_response)
147 | |         }))
    | |___________^ `(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)`
    = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)>`
    = note: required because it appears within the type `std::boxed::Box<(dyn futures::Stream<Item=hyper::Chunk, Error=std::boxed::Box<(dyn std::error::Error + std::marker::Send + std::marker::Sync + 'static)>> + std::marker::Send + 'static)>`
    = note: required because it appears within the type `hyper::body::body::Kind`
    = note: required because it appears within the type `hyper::Body`
    = note: required because it appears within the type `hyper::Request<hyper::Body>`

So while the response handling now seems ok it appears that we are not allowed to pass out the request object into my self.cache_store() method. What do you need to convert a Hyper request into so that is is usable on methods of the Proxy Service impl I'm having here?

Attempt 5: do not pass any request instance to the method, just the resaponse bytes to get anything compiling at all.

let (parts, body) = response.into_parts();
let body_bytes = body.concat2().wait().unwrap().to_vec();

// Put the response into the cache.
self.cache_store(body_bytes);

Response::from_parts(parts, Body::from(body_bytes.clone()))

Compile error:

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
   --> src/lib.rs:93:52
    |
93  |           Box::new(self.client.request(request).then(|result| {
    |  ____________________________________________________^
94  | |             let our_response = match result {
95  | |                 Ok(mut response) => {
96  | |                     let version = match response.version() {
...   |
130 | |             futures::future::ok(our_response)
131 | |         }))
    | |_________^
    |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 46:5...
   --> src/lib.rs:46:5
    |
46  | /     fn call(&mut self, mut request: Request<Body>) -> Self::Future {
47  | |         if let Some(response) = self.cache_lookup(&request) {
48  | |             return Box::new(futures::future::ok(response));
49  | |         }
...   |
131 | |         }))
132 | |     }
    | |_____^
    = note: ...so that the types are compatible:
            expected &&mut Proxy
               found &&mut Proxy
    = note: but, the lifetime must be valid for the static lifetime...
    = note: ...so that the expression is assignable:
            expected std::boxed::Box<(dyn futures::Future<Item=hyper::Response<hyper::Body>, Error=hyper::Error> + std::marker::Send + 'static)>
               found std::boxed::Box<dyn futures::Future<Item=hyper::Response<hyper::Body>, Error=hyper::Error> + std::marker::Send>

What could this lifetime problem mean? We are in a Future closure, so maybe the Proxy instance is called later when the outer call() method is long gone? How do I pass my cache around so that it is compatible with Future lifetimes?

Attempt 6: Try to store the body as bytes and the header part. Create a Cache struct that we can clone before passing into the client response closure.

type CachedResponse = (Parts, Vec<u8>);

#[derive(Clone)]
struct Cache {
    lru_cache: Arc<RwLock<LruCache<String, CachedResponse>>>,
}

impl Cache {
    fn cache_key(&self, request: &Request<Body>) -> Option<String> {
        Some("x".to_string())
    }

    /// Check if we have a response for this request in memory.
    fn lookup(&self, cache_key: &Option<String>) -> Option<Response<Body>> {
        None
    }

    fn store(&self, cache_key: Option<String>, header_part: Parts, body_bytes: Vec<u8>) -> bool {
        if let Some(key) = cache_key {
            let mut inner_cache = self.lru_cache.write().unwrap();
            inner_cache.insert(key, (header_part, body_bytes));
            return true;
        }
        false
    }
}

Use it like this:

let cloned_cache = self.cache.clone();

Box::new(self.client.request(request).then(move |result| {
    // ...

            let (parts, body) = response.into_parts();
            let body_bytes = body.concat2().wait().unwrap().to_vec();

            // Put the response into the cache.
            cloned_cache.store(cache_key, parts.clone(), body_bytes.clone());

            Response::from_parts(parts, Body::from(body_bytes))
    // ...
}))

This almost works, but unfortunately the header parts of a response cannot be cloned:

error[E0599]: no method named `clone` found for type `http::response::Parts` in the current scope
   --> src/lib.rs:124:57
    |
124 |                     cloned_cache.store(cache_key, parts.clone(), body_bytes.clone());

I'm desperately missing a clone method on Hyper Response instances or the header part. How can I copy the full data in a Hyper response?

I’ve not had time to look at all of your attempts in detail, but this one doesn’t work because you’re using RwLock - it requires that the value you hold in it is Sync, not just Send (because it allows concurrent reads). If you use a Mutex instead then it should work because the underlying hyper type is Send.

Good insight, thanks! The Body type is !Sync. Probably because it is a stream or a future or both.

I don't want to use Mutex because that would defeat the purpose of a fast cache: multiple readers are perfectly fine. Actually I would be interested in something even less strict: concurrent writes and reads at the same time because we don't care about consistency in a key-value cache.

So I see 2 options how to cache hyper responses:

  1. Convert Body into a Sync structure for caching. Convert it back to Body when reading from the cache.
  2. Switch to Resonse<[u8]> or similar. Lose the ability to pass streaming bodies back and forth in a proxy.

Attempt 7: Build the cache with a custom struct that converts from Response.

struct CachedResponse {
    status: StatusCode,
    version: Version,
    headers: HeaderMap<HeaderValue>,
    body: Vec<u8>,
}

#[derive(Clone)]
struct Cache {
    lru_cache: Arc<RwLock<LruCache<String, CachedResponse>>>,
}

store in the cache:

fn store(&self, cache_key: Option<String>, header_part: &Parts, body_bytes: Vec<u8>) -> bool {
    if let Some(key) = cache_key {
        let mut inner_cache = self.lru_cache.write().unwrap();
        let entry = CachedResponse {
            status: header_part.status,
            version: header_part.version,
            headers: header_part.headers.clone(),
            body: body_bytes,
        };
        inner_cache.insert(key, entry);
        return true;
    }
    false
}

read from the cache:

/// Check if we have a response for this request in memory.
fn lookup(&self, cache_key: &Option<String>) -> Option<Response<Body>> {
    let inner_cache = self.lru_cache.read().unwrap();
    match inner_cache.get("x") {
        Some(entry) => {
            let mut response = Response::builder()
                .status(entry.status)
                .version(entry.version)
                .body(Body::from(entry.body.clone()))
                .unwrap();
            *response.headers_mut() = entry.headers.clone();
            Some(response)
        }
        None => None,
    }
}

Compilation fails:

error[E0596]: cannot borrow immutable borrowed content as mutable
   --> src/lib.rs:165:15
    |
165 |         match inner_cache.get("x") {
    |               ^^^^^^^^^^^ cannot borrow as mutable

error: aborting due to previous error

Oh no ... because we use LruCache::<String, CachedResponse>::with_expiry_duration_and_capacity(time_to_live, 20); every read access to the cache is also a write access! The cache has to update the last accessed timestamp, so we cannot use RwLock. We will have to use Mutex or similar after all.

Attempt 8: use a Mutex in Arc<Mutex<LruCache<String, CachedResponse>>>, otherwise same CachedResponse converting as in attempt 7.

This compiles and my first test case is passing, hooray!

Still a long way to go to make this fully functional, simplify and optimize it - but at least the compiler is shutting up now :smiley:

1 Like

I also played with the same problem ,
Using hyper client , and caching the response.

My question is - can I avoid using wait?
body.concat2().wait().unwrap().to_vec()
Will wait block the tokio event loop ?
If that's true ,
I couldn't find an example , to return a stream as hyper body , and in the same func , to wait for stream to finish and store it in cache

Yes, wait will block the event loop. Use and_then or an async block with await to wait for concat2 instead.

My challenge was, how to clone the body stream into to separate futures ( My rust skills arn't that advanced yet )
this answer did the trick for me
Fan-Out a stream -> Hyper::body.clone_body()

I have my code in github
this project is a naive implantation for async server that proxy requests with cache.