[Solved] Lifetime issue with CpuPool


#1

I’m writing a simple RPC server with tokio and futures-cpupool. The server holds a btreemap of boxed closures, with function name as key. The current implementation is pretty straight-forward:

pub struct SlackerServiceSync<T>
    where T: Send + Sync + 'static
{
    functions: Arc<BTreeMap<String, RpcFnSync<T>>>,
    pool: CpuPool,
}

impl<T> SlackerServiceSync<T>
    where T: Send + Sync + 'static
{
    pub fn new(functions: Arc<BTreeMap<String, RpcFnSync<T>>>,
               threads: usize)
               -> SlackerServiceSync<T> {
        let pool = CpuPool::new(threads);
        SlackerServiceSync { functions, pool }
    }
}

impl<T> Service for SlackerServiceSync<T>
    where T: Send + Sync + 'static
{
    type Request = SlackerPacket<T>;
    type Response = SlackerPacket<T>;
    type Error = io::Error;
    type Future = BoxFuture<Self::Response, Self::Error>;

    fn call(&self, req: Self::Request) -> Self::Future {
        match req {
            SlackerPacket::Request(sreq) => {
                debug!("getting request: {:?}", sreq.fname);
                if let Some(f) = self.functions.get(&sreq.fname) {
                    self.pool
                        .spawn_fn(move || -> FutureResult<T, Self::Error> {
                                      ok(f(&sreq.arguments))
                                  })
                        .and_then(move |result| {
                            debug!("getting results");
                            ok(SlackerPacket::Response(SlackerResponse {
                                                           version: sreq.version,
                                                           code: RESULT_CODE_SUCCESS,
                                                           content_type: sreq.content_type,
                                                           serial_id: sreq.serial_id,
                                                           result: result,
                                                       }))
                        })
                        .map_err(|_| io::Error::new(io::ErrorKind::Other, "Oneshot canceled"))
                        .boxed()
                } else {
                    let error = SlackerError {
                        version: sreq.version,
                        code: RESULT_CODE_NOT_FOUND,
                        serial_id: sreq.serial_id,
                    };
                    ok(SlackerPacket::Error(error)).boxed()
                }
            }
            SlackerPacket::Ping(ref ping) => {
                ok(SlackerPacket::Pong(SlackerPong { version: ping.version })).boxed()
            }
            _ => err(io::Error::new(io::ErrorKind::InvalidInput, "Unsupported packet")).boxed(),
        }
    }
}

I’m currently blocked by this lifetime issue on self.functions.get(&sreq.fname).

error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements
   --> src/service.rs:103:49
    |
103 |                 if let Some(f) = self.functions.get(&sreq.fname) {
    |                                                 ^^^
    |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the body at 99:55...
   --> src/service.rs:99:56
    |
99  |       fn call(&self, req: Self::Request) -> Self::Future {
    |  ________________________________________________________^
100 | |         match req {
101 | |             SlackerPacket::Request(sreq) => {
102 | |                 debug!("getting request: {:?}", sreq.fname);
...   |
133 | |         }
134 | |     }
    | |_____^
note: ...so that reference does not outlive borrowed content
   --> src/service.rs:103:34
    |
103 |                 if let Some(f) = self.functions.get(&sreq.fname) {
    |                                  ^^^^^^^^^^^^^^
    = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the type `[closure@src/service.rs:105:35: 107:36 f:&std::boxed::Box<for<'r> std::ops::Fn(&'r std::vec::Vec<T>) -> T + std::marker::Send + std::marker::Sync>, sreq:packets::SlackerRequest<T>]` will meet its required lifetime bounds
   --> src/service.rs:105:26
    |
105 |                         .spawn_fn(move || -> FutureResult<T, Self::Error> {
    |                          ^^^^^^^^

error: aborting due to previous error

error: Could not compile `slacker`.

Similar code works without CpuPool. To be honest I cannot fully understand the error reported by compiler. Anybody could help?

Full code is here


#3

Try cloning the Arc holding the closures and looking up the closure from the clone. It seems compiler thinks the returned reference to the closure may not live long enough to be referenced in the cpupool closure because it’s associated with the &self lifetime. If you clone the Arc, you should be able to “detach” that lifetime association.

Or maybe wrap the RpcFnSync closures in an Arc and then move a clone to the closure in your function.


#4

It works! When wrapping closure into an Arc and clone it before sending to thread pool, the lifetime error is eliminated.

Thank you!