Concurrent access to a hashmap of closures (Axum http server)

Hello there,

I am pretty new to Rust, coming from Scala Typelevel, and my current "hands-on" project is a configurable mock server. For now, it is configurable though Yaml routes descriptions in files such as:
main-api.yaml

name: mvp
headers:
  X-MACHINE-ID: test
rules:
  - matches: POST /my-route/test
    status: 200/text
    body: "{test: 2}"
  - matches: PATCH /other-route/:name/:test
    status: 400/text
    body: regular body

another-api.yaml

name: mvp
headers:
  X-MACHINE-ID: anothertest
rules:
  - matches: POST /my-route/test
    status: 200/text
    body: "{test: 3}"
  - matches: PATCH /other-route/:name/:test
    status: 500/text
    body: another body

api-shape.yaml

name: mvp
shape:
  - POST /my-route/test
  - PATCH /other-route/:name/:test

All api files with the same name are variants, that should match the shape of the same name.
To choose between available apis of the same name, I am working on a solution that check request http headers against the ones described in the apis, and pick the first match.

To do so, I build a handler for each route, checking the headers’ value of the request and returning the corresponding response, or checking with the next header... etc, storing the handler (closure) in a hashmap with the route as the key to access it.

This hashmap should then be accessed concurrently when a request is received on the given route. So there are concurrent reads once the hashmap has been built (considered immutable after the build step). But when building this project, I get these errors:

  • dyn Fn(Request<Body>) -> std::option::Option<Response<http_body::combinators::box_body::UnsyncBoxBody<axum::body::Bytes, axum::Error>>> cannot be shared between threads safely

  • dyn Fn(Request<Body>) -> std::option::Option<Response<http_body::combinators::box_body::UnsyncBoxBody<axum::body::Bytes, axum::Error>>> cannot be sent between threads safely

use std::collections::HashMap;
use std::sync::{Arc };
use axum::{Router};
use axum::body::{Body, BoxBody};
use axum::http::{ Method, Request, Response, StatusCode};
use axum::response::IntoResponse;
use axum::routing::{post};
use crate::model::core::{ApiCore, RuleCore, SystemCore};

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct HttpRoute {
    pub route: String,
    pub method: Method
}

fn compute_response(req: &Request<Body>, rule: &RuleCore, api: &ApiCore) -> Option<Response<BoxBody>> {

    // All api headers must match the corresponding headers in the received request
    let matching_request = api.headers.iter()
        .all(|(key, value)|
            req.headers().get(key)
                .map(|req_header_value| req_header_value.eq(value))
                .unwrap_or(false)
        );

    if matching_request {
        let body = rule.body.map(|str| Body::from(str)).unwrap_or(Body::empty());
        let body = axum::body::boxed(body);
        let res = Response::builder()
            .status(rule.status)
            .body(body)
            .unwrap();
        Some(res)
    } else {
        None
    }
}

pub type GeneratedHandler = dyn Fn(Request<Body>) -> Option<Response<BoxBody>>;
pub type HandlersMap = HashMap<HttpRoute, Arc<GeneratedHandler>>;

impl SystemCore {
    pub fn generate_router(&self) -> Router {

        // Build the handlers map (route -> handler)

        let mut handlers: HandlersMap = HashMap::new();

        for api_set in self.api_sets.iter() {
            for api in api_set.apis.iter() {
                for rule in api.rules.into_iter() {

                    let http_route = HttpRoute {
                        route: format!("/{}/{}{}", self.name, &api_set.name, rule.endpoint.route),
                        method: rule.endpoint.method
                    };

                    handlers.entry(http_route)
                        .and_modify(|prev | {
                            *prev = Arc::new(|req| prev(req).or(compute_response(&req, &rule, api)))
                        })
                        .or_insert(Arc::new(|req| compute_response(&req, &rule, api)));

                }
            }
        }

        // make handlers immutable
        let handlers = handlers;

        let shared_handlers = Arc::new(handlers);
        let mut router = Router::new();

        for endpoint in handlers.into_keys().into_iter() {
            router = router.route(
                &*endpoint.route,
                match endpoint.method {
                    Method::POST => post( { // build error here

                        let shared_handlers = Arc::clone(&shared_handlers);

                        move |request: Request<Body>| async {
                            let http_route = HttpRoute {
                                method: request.method().to_owned(),
                                route: request.uri().path().to_string(),
                            };

                            match shared_handlers.get(&http_route) {
                                Some(matching_handler) => {

                                    let matching_handler = Arc::clone(matching_handler);

                                    match matching_handler(request) {
                                        Some(res) => res,
                                        None => StatusCode::NOT_FOUND.into_response()
                                    }
                                }
                                None => StatusCode::NOT_FOUND.into_response()
                            }
                        }
                    }),
                    _ => panic!("unknown http method")
                }
            );
        }

        router
    }
}

Here is my Cargo.toml

[package]
name = "mochi"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.29.1", features = ["full"] }
serde = { version = "1.0.166", features = ["derive"] }
serde_json = "1.0.100"
serde_yaml = "0.9.22"
axum = "0.6.18"
axum-macros = "0.3.7"
regex = "1.9.0"
itertools = "0.11.0"

before ending up with this approach, I tried a different one with a shared state through Extension/State apis provided by Axum but without success.

So here are my questions:

  1. What did I miss ? I tried wrapping the "GeneratedHandler" type in a read write lock as recommended in some posts, but in my case it should not be necessary as there are only concurrent reads ?
  2. How would you share a map with only concurrent reads ?
  3. Is Arc necessary at the hashmap values’ level ? A box should be enough ?

Thanks for your time.

You might get better responses if you paste the full (or more complete) error from cargo check.

You probably need

-pub type GeneratedHandler = dyn        Fn(Request<Body>) -> Option<Response<BoxBody>>;
+pub type GeneratedHandler = dyn Sync + Fn(Request<Body>) -> Option<Response<BoxBody>>;

or + Send + Sync or something like that.

2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.