Hyper service + lifetimes + generics: cannot get it right

#1

Hi! I am new to Rust. I am trying to create a toy microservice with Hyper, and I have run into some issues I cannot find a solution for.

I initially had a working implementation for a service that lets users read and update key-value pairs. But, then I tried to use a generic repository (using a trait), so that the service could transparently use multiple repositories (e.g., an in-memory repository, a repository backed up by DynamoDB, etc.). Here is when problems started.

I have tried solving this issue on my own, manually adding lifetime specifiers (e.g., 'static), but none of the attempts were successful. I even created a command-line application (not using Hyper) and I was able to “inject” different repository types into the application struct (which was storing an Arc reference to the repository). So, I suppose some of Hyper’s internals are adding some constraints that my code does not satisfy. I will greatly appreciate any help to solve this issue.

Next, I am including the complete code of the microservice. I am aware it is quite a bit of code for a question, but I thought having a fully reproducible example was best. Before the code listing, I have included the first error that the compiler generates (the rest are included at the end; they seem to be all related).

error[E0310]: the parameter type `Repo` may not live long enough
  --> src/main.rs:71:9
   |
62 |   impl<Repo: ValueRepository> NewService for ValueService<Repo> {
   |        ----- help: consider adding an explicit lifetime bound `Repo: 'static`...
...
71 | /         Box::new(future::ok(Self {
72 | |             repo: self.repo.clone(),
73 | |         }))
   | |___________^
   |
note: ...so that the type `futures::FutureResult<ValueService<Repo>, hyper::Error>` will meet its required lifetime bounds
  --> src/main.rs:71:9
   |
71 | /         Box::new(future::ok(Self {
72 | |             repo: self.repo.clone(),
73 | |         }))
   | |___________^

Code listing:

extern crate futures;
extern crate hyper;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use futures::{future, Future, Stream};
use futures::future::FutureResult;
use hyper::{Body, Chunk, Method, Request, Response, StatusCode};
use hyper::service::{NewService, Service};
use hyper::server::Server;

#[derive(Serialize, Deserialize)]
pub struct Value {
    pub key: String,
    pub value: String,
}

// repository trait

pub trait ValueRepository: Send {
    fn get(&self, key: String) -> Result<String, &str>;
    fn put(&mut self, key: String, value: String);
}

// in-memory repository

pub struct InMemoryRepository {
    dict: HashMap<String, String>,
}

impl InMemoryRepository {
    pub fn new() -> InMemoryRepository {
        InMemoryRepository {
            dict: HashMap::new(),
        }
    }
}

impl ValueRepository for InMemoryRepository {
    fn get(&self, key: String) -> Result<String, &str> {
        match self.dict.get(&key) {
            Some(value) => Ok(value.to_string()),
            _ => Err("Not Found")
        }
    }

    fn put(&mut self, key: String, value: String) {
        self.dict.insert(key, value);
    }
}

// service

pub struct ValueService<Repo: ValueRepository> {
    pub repo: Arc<Mutex<Repo>>
}

impl<Repo: ValueRepository> NewService for ValueService<Repo> {
    type ReqBody = Body;
    type ResBody = Body;
    type Error = hyper::Error;
    type Service = ValueService<Repo>;
    type Future = Box<Future<Item=Self::Service, Error=Self::InitError> + Send>;
    type InitError = hyper::Error;

    fn new_service(&self) -> Self::Future {
        Box::new(future::ok(Self {
            repo: self.repo.clone(),
        }))
    }
}

impl<Repo: ValueRepository> Service for ValueService<Repo> {
    type ReqBody = Body;
    type ResBody = Body;
    type Error = hyper::Error;
    type Future = Box<Future<Item=Response<Body>, Error=Self::Error> + Send>;

    fn call(&mut self, request: Request<Self::ReqBody>) -> Self::Future {
        match request.method() {
            &Method::PUT => self.handle_put_request(request),
            _ => self.bad_request_error_response(),
        }
    }
}

impl<Repo: ValueRepository> ValueService<Repo> {
    pub fn start(self, port: u16) {
        let addr = format!("0.0.0.0:{}", port).parse().unwrap();
        let server = Server::bind(&addr)
            .serve(self)
            .map_err(|e| eprintln!("error: {}", e));

        println!("Serving at {}", addr);
        hyper::rt::run(server);
    }

    fn handle_put_request(&mut self, request: Request<<ValueService<Repo> as Service>::ReqBody>)
                          -> <ValueService<Repo> as Service>::Future {
        let repo = self.repo.clone();

        let resp = request
            .into_body()
            .concat2()
            .and_then(parse_body)
            .and_then(move |value| {
                let repo = &mut *repo.clone().lock().unwrap();
                update_value(value, repo)
            })
            .then(make_put_response);

        Box::new(resp)
    }

    fn bad_request_error_response(&self) -> <ValueService<Repo> as Service>::Future {
        Box::new(future::ok(
            Response::builder()
                .status(StatusCode::BAD_REQUEST)
                .body(Body::empty())
                .unwrap()
        ))
    }
}

fn parse_body(body: Chunk) -> FutureResult<Value, hyper::Error> {
    let value: Value = serde_json::from_slice(&body).unwrap();
    future::ok(value)
}

fn update_value<Repo: ValueRepository>(value: Value, repo: &mut Repo)
                                       -> FutureResult<i64, hyper::Error> {
    repo.put(value.key, value.value);
    future::ok(0)
}

fn make_put_response<Repo: ValueRepository>(result: Result<i64, hyper::Error>)
                                            -> FutureResult<
                                                Response<<ValueService<Repo> as Service>::ResBody>,
                                                hyper::Error
                                            > {
    future::ok(Response::builder()
        .status(StatusCode::OK)
        .body(Body::empty())
        .unwrap())
}


fn main() {
    let entity_repository = Arc::new(Mutex::new(
        InMemoryRepository::new()
    ));

    {
        let mut repo = &mut *entity_repository.lock().unwrap();
        repo.put("foo".to_string(), "bar".to_string());
        repo.put("abc".to_string(), "123".to_string());
    }

    let service = ValueService { repo: entity_repository };
    service.start(8000);
}

Another issue that I had is related to the functions at the very end (e.g., parse_body). I initially wanted to make them methods of ValueService, but I ran into more problems. My initial approach was:

        let resp = request
            .into_body()
            .concat2()
            .and_then(|chunk| self.parse_body(chunk))
            .and_then(move |value| {
                let repo = &mut *repo.clone().lock().unwrap();
                self.update_value(value, repo)
            })
            .then(|result| self.make_put_response(result));

        Box::new(resp)

When doing that I got the following error:

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
   --> src/main.rs:109:23
    |
109 |             .and_then(|chunk| self.parse_body(chunk))
    |                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 102:5...
   --> src/main.rs:102:5
    |
102 | /     fn handle_put_request(&mut self, request: Request<<ValueService<Repo> as Service>::ReqBody>)
103 | |                           -> <ValueService<Repo> as Service>::Future {
104 | |         let repo = self.repo.clone();
105 | |
...   |
116 | |         Box::new(resp)
117 | |     }
    | |_____^
    = note: ...so that the types are compatible:
            expected &&mut ValueService<Repo>
               found &&mut ValueService<Repo>
    = note: but, the lifetime must be valid for the static lifetime...
    = note: ...so that the expression is assignable:
            expected std::boxed::Box<(dyn futures::Future<Error=hyper::Error, Item=hyper::Response<hyper::Body>> + std::marker::Send + 'static)>
               found std::boxed::Box<dyn futures::Future<Error=hyper::Error, Item=hyper::Response<hyper::Body>> + std::marker::Send>

Am I doing something wrong, or the approach using methods is not feasible in this case?

The full error listing for the original issue is included next:

error[E0310]: the parameter type `Repo` may not live long enough
  --> src/main.rs:71:9
   |
62 |   impl<Repo: ValueRepository> NewService for ValueService<Repo> {
   |        ----- help: consider adding an explicit lifetime bound `Repo: 'static`...
...
71 | /         Box::new(future::ok(Self {
72 | |             repo: self.repo.clone(),
73 | |         }))
   | |___________^
   |
note: ...so that the type `futures::FutureResult<ValueService<Repo>, hyper::Error>` will meet its required lifetime bounds
  --> src/main.rs:71:9
   |
71 | /         Box::new(future::ok(Self {
72 | |             repo: self.repo.clone(),
73 | |         }))
   | |___________^

error[E0310]: the parameter type `Repo` may not live long enough
  --> src/main.rs:95:14
   |
91 | impl<Repo: ValueRepository> ValueService<Repo> {
   |      ----- help: consider adding an explicit lifetime bound `Repo: 'static`...
...
95 |             .serve(self)
   |              ^^^^^
   |
note: ...so that the type `ValueService<Repo>` will meet its required lifetime bounds
  --> src/main.rs:95:14
   |
95 |             .serve(self)
   |              ^^^^^

error[E0310]: the parameter type `Repo` may not live long enough
  --> src/main.rs:95:14
   |
91 | impl<Repo: ValueRepository> ValueService<Repo> {
   |      ----- help: consider adding an explicit lifetime bound `Repo: 'static`...
...
95 |             .serve(self)
   |              ^^^^^
   |
note: ...so that the type `hyper::server::conn::spawn_all::NewSvcTask<hyper::server::conn::AddrStream, std::boxed::Box<(dyn futures::Future<Error=hyper::Error, Item=ValueService<Repo>> + std::marker::Send + 'static)>, ValueService<Repo>, hyper::common::exec::Exec, hyper::server::conn::spawn_all::NoopWatcher>` will meet its required lifetime bounds
  --> src/main.rs:95:14
   |
95 |             .serve(self)
   |              ^^^^^

error[E0310]: the parameter type `Repo` may not live long enough
  --> src/main.rs:96:14
   |
91 | impl<Repo: ValueRepository> ValueService<Repo> {
   |      ----- help: consider adding an explicit lifetime bound `Repo: 'static`...
...
96 |             .map_err(|e| eprintln!("error: {}", e));
   |              ^^^^^^^
   |
note: ...so that the type `ValueService<Repo>` will meet its required lifetime bounds
  --> src/main.rs:96:14
   |
96 |             .map_err(|e| eprintln!("error: {}", e));
   |              ^^^^^^^

error[E0310]: the parameter type `Repo` may not live long enough
  --> src/main.rs:96:14
   |
91 | impl<Repo: ValueRepository> ValueService<Repo> {
   |      ----- help: consider adding an explicit lifetime bound `Repo: 'static`...
...
96 |             .map_err(|e| eprintln!("error: {}", e));
   |              ^^^^^^^
   |
note: ...so that the type `hyper::server::conn::spawn_all::NewSvcTask<hyper::server::conn::AddrStream, std::boxed::Box<(dyn futures::Future<Error=hyper::Error, Item=ValueService<Repo>> + std::marker::Send + 'static)>, ValueService<Repo>, hyper::common::exec::Exec, hyper::server::conn::spawn_all::NoopWatcher>` will meet its required lifetime bounds
  --> src/main.rs:96:14
   |
96 |             .map_err(|e| eprintln!("error: {}", e));
   |              ^^^^^^^

error[E0310]: the parameter type `Repo` may not live long enough
  --> src/main.rs:99:9
   |
91 | impl<Repo: ValueRepository> ValueService<Repo> {
   |      ----- help: consider adding an explicit lifetime bound `Repo: 'static`...
...
99 |         hyper::rt::run(server);
   |         ^^^^^^^^^^^^^^
   |
note: ...so that the type `futures::MapErr<hyper::Server<hyper::server::conn::AddrIncoming, ValueService<Repo>>, [closure@src/main.rs:96:22: 96:51]>` will meet its required lifetime bounds
  --> src/main.rs:99:9
   |
99 |         hyper::rt::run(server);
   |   
#2

Most of the errors are because of the 'static requirement on the Repo generic type. The easiest solution is to state that ValueService trait itself requires the impls to be 'static:

pub trait ValueRepository: Send + 'static {
    fn get(&self, key: String) -> Result<String, &str>;
    fn put(&mut self, key: String, value: String);
}

You can also add the 'static bound on Repo in a few places, but that seems more verbose than simply adding that bound to the trait itself.

There were a couple of other compiler errors that I fixed up, nothing too serious though. Here’s the full code that works (well, compiles and starts :slight_smile:) :

extern crate futures;
extern crate hyper;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use futures::{future, Future, Stream};
use futures::future::FutureResult;
use hyper::{Body, Chunk, Method, Request, Response, StatusCode};
use hyper::service::{NewService, Service};
use hyper::server::Server;

#[derive(Serialize, Deserialize)]
pub struct Value {
    pub key: String,
    pub value: String,
}

// repository trait

pub trait ValueRepository: Send + 'static {
    fn get(&self, key: String) -> Result<String, &str>;
    fn put(&mut self, key: String, value: String);
}

// in-memory repository

pub struct InMemoryRepository {
    dict: HashMap<String, String>,
}

impl InMemoryRepository {
    pub fn new() -> InMemoryRepository {
        InMemoryRepository {
            dict: HashMap::new(),
        }
    }
}

impl ValueRepository for InMemoryRepository {
    fn get(&self, key: String) -> Result<String, &str> {
        match self.dict.get(&key) {
            Some(value) => Ok(value.to_string()),
            _ => Err("Not Found")
        }
    }

    fn put(&mut self, key: String, value: String) {
        self.dict.insert(key, value);
    }
}

// service

pub struct ValueService<Repo: ValueRepository> {
    pub repo: Arc<Mutex<Repo>>
}

impl<Repo: ValueRepository> NewService for ValueService<Repo> {
    type ReqBody = Body;
    type ResBody = Body;
    type Error = hyper::Error;
    type Service = ValueService<Repo>;
    type Future = Box<Future<Item=Self::Service, Error=Self::InitError> + Send>;
    type InitError = hyper::Error;

    fn new_service(&self) -> Self::Future {
        Box::new(future::ok(Self {
            repo: self.repo.clone(),
        }))
    }
}

impl<Repo: ValueRepository> Service for ValueService<Repo> {
    type ReqBody = Body;
    type ResBody = Body;
    type Error = hyper::Error;
    type Future = Box<Future<Item=Response<Body>, Error=Self::Error> + Send>;

    fn call(&mut self, request: Request<Self::ReqBody>) -> Self::Future {
        match request.method() {
            &Method::PUT => self.handle_put_request(request),
            _ => self.bad_request_error_response(),
        }
    }
}

impl<Repo: ValueRepository> ValueService<Repo> {
    pub fn start(self, port: u16) {
        let addr = format!("0.0.0.0:{}", port).parse().unwrap();
        let server = Server::bind(&addr)
            .serve(self)
            .map_err(|e| eprintln!("error: {}", e));

        println!("Serving at {}", addr);
        hyper::rt::run(server);
    }

    fn handle_put_request(&mut self, request: Request<<ValueService<Repo> as Service>::ReqBody>)
                          -> <ValueService<Repo> as Service>::Future {
        let repo = self.repo.clone();

        let resp = request
            .into_body()
            .concat2()
            .and_then(parse_body)
            .and_then(move |value| {
                let repo = Arc::clone(&repo);
                let mut repo = repo.lock().unwrap();
                update_value(value, &mut *repo)
            })
            .then(make_put_response::<Repo>);

        Box::new(resp)
    }

    fn bad_request_error_response(&self) -> <ValueService<Repo> as Service>::Future {
        Box::new(future::ok(
            Response::builder()
                .status(StatusCode::BAD_REQUEST)
                .body(Body::empty())
                .unwrap()
        ))
    }
}

fn parse_body(body: Chunk) -> FutureResult<Value, hyper::Error> {
    let value: Value = serde_json::from_slice(&body).unwrap();
    future::ok(value)
}

fn update_value<Repo: ValueRepository>(value: Value, repo: &mut Repo)
                                       -> FutureResult<i64, hyper::Error> {
    repo.put(value.key, value.value);
    future::ok(0)
}

fn make_put_response<Repo: ValueRepository>(result: Result<i64, hyper::Error>)
                                            -> FutureResult<
                                                Response<<ValueService<Repo> as Service>::ResBody>,
                                                hyper::Error
                                            > {
    future::ok(Response::builder()
        .status(StatusCode::OK)
        .body(Body::empty())
        .unwrap())
}


fn main() {
    let entity_repository = Arc::new(Mutex::new(
        InMemoryRepository::new()
    ));

    {
        let mut repo = &mut *entity_repository.lock().unwrap();
        repo.put("foo".to_string(), "bar".to_string());
        repo.put("abc".to_string(), "123".to_string());
    }

    let service = ValueService { repo: entity_repository };
    service.start(8000);
}
2 Likes
#3

Thank you so much for your answer, and apologies for the late reply. After applying your changes, I faced a few other challenges, and I was waiting to understand them well before asking for more help. In the end I managed to solve them after your answer set me on the right path.

Learning Rust is certainly not easy (even with a C++ background), but it’s being an interesting process :slight_smile:

#4

That’s an opinion shared by many, and probably most, programmers with an imperative-language background. Here’s someone else’s tale of struggle and success in the quest to become proficient with Rust.