Pass a tower::Service in an async FnMut

Hi folks!

I'm building a library where users can pass a [tower::MakeService](MakeService in tower - Rust) that takes a specific struct, and it'll run a hyper Server that handles transforming incoming payload into that struct.

As part of that, I need to create my own MakeService and Service to handle the transformation. However, I'm stuck trying to pass the user's Service into an async closure into tower::service_fn.

What I've tried

  1. If I pass the service itself, the closure is now FnOnce instead of FnMut as the service will be consumed on the first call.
  2. If I pass a &mut service, there is a lifetime conflict as the future could outlive the service itself.

For now, I'm using an Arc<Mutex<Service>>, but that feels excessive, since a single Service will only be called once at a time.

Is there a better way to do this than passing the service in an Arc<Mutex<_>>?

Code

Here's a simplified version of the code I have at the moment:

use hyper::{server::conn::AddrStream, Server, Request, Body, Response};
use std::sync::Arc;
use std::net::SocketAddr;
use tower::{service_fn, make::MakeService, Service};
use tokio::sync::Mutex;

pub type Error = Box<dyn std::error::Error + Send + Sync>;

pub struct MyRequest;
pub struct MyResponse;

/// Wrapper function that transform hyper::Request into MyRequest, and hyper::Response into MyResponse
pub async fn wrapper<S>(service: Arc<Mutex<S>>, req: Request<Body>) -> Result<Response<Body>, Error>
where
    S: Service<MyRequest, Response = MyResponse>,
{
    let _body = hyper::body::to_bytes(req.into_body()).await?;
    let my_request = MyRequest;

    {
        let mut service = service.lock().await;
        let _my_response = service.call(my_request).await;
    }

    Ok(hyper::Response::new(hyper::Body::empty()))
}

/// Function that takes a `MakeService` and runs an HTTP server
pub async fn run<'a, M>(mut make_service: M) -> Result<(), Error>
where
    M: MakeService<(), MyRequest, Response = MyResponse> + Send + Sync + 'static,
    M::Service: Service<MyRequest, Response = MyResponse> + Send + Sync,
    <M::Service as Service<MyRequest>>::Future: Send + 'a,
    M::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
    M::MakeError: Into<Box<dyn std::error::Error + Send + Sync>>,
    M::Future: Send,
{
    let make_service = service_fn(move |_: &AddrStream| {
        let service = make_service.make_service(());

        async move {
            // Need to create an Arc<Mutex<_>> here.
            let service = Arc::new(Mutex::new(service.await?));
            Ok::<_, M::MakeError>(service_fn(move |req| wrapper(service.clone(), req)))
        }
    });

    let addr = SocketAddr::from(([0, 0, 0, 0], 8000));
    let server = Server::bind(&addr).serve(make_service);

    server.await?;

    Ok(())
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.