Translated JS into RUST for worse performance? What did I do wrong

Hello, I translated by API backend into rust for better performance as it could handle just about 1 requests per second.

When user makes request the the backend will fetch multiple API /RPC calls and return formatted information to user.

For each call It runs about 20 aplications (API call with formatting or blockchain calls)
Some of the API calls can take up to 15 seconds to respond. but i made all functions async so it should not be problem?

The main issue:
When I make more than 1 request per second it responds for like 20 seconds to all requests in time.Then the app completly halts and stops responding at all.
and every requests gets timed out (I have set 30 sec timeout for the functions, request is processed in about 5-10 seconds normaly).

CPU usage is low, the same is for memory. so its not resource bottleneck.
I run some flamegraph tests below is link on 4 tests I did.
(its regular svg but i could not post it here)
https://easyupload.io/m/txuqd8

I tried running all the DappServiceFn with taiko::spawn that I thought could cause the issue so I changed it to future .boxed functions the problem is almost the same.

I started with rust about week ago and this is my first project I ever written in rust.
I might did something stupid but cant spot it and dont know what could kill the request per seconds as much. (even my javascript version could handle the same load for longer time before halting)

If someone is real pro here and can see it right away it would help me a lot <3
I definitly did something wrong with the async and dont want to rewrite everything in Golang just before I cant figure out what is wrong with the async

Below I provided the snippet of the backend
Mainly /quote endpoint (rest of endpoing are doing basicaly the same just minor adjustments, they run load of API and blockchain calls in the end in paralel)

The flow is the script is main.rs --> api.rs --> quote.rs --> quote_service.rs --> and lastly the main script that runs all the dApps in paralel is quote_router.rs It executes all available dApp scripts for given reqeust (usualy around 20 at once I provided below just one example openocean.rs this post will be to long anyway. I can provide more utils script or the dapps in comments if needed and someone suspects there is hudge flaw.

For anyone that managed to read everything, thank you so much.
below is repo, with more all mandatory scripts:
epxteku/API_shared_snippet (github.com)

src/main.rs

#![allow(non_snake_case)]
use std::sync::Arc;
use tokio::task;
use tower::ServiceBuilder;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::{TraceLayer, DefaultMakeSpan};
use tracing::{info, Level, Span};
use tracing_subscriber::{EnvFilter, fmt, prelude::*};
use axum::{Server, Router, extract::Extension};
use axum::http::Request;
use api::create_api_routes;
use load_resources::{create_app_state, reload_tokens};
use path_updater::start_all_update_processes;
use std::env;
use std::fs::File;
use pprof::ProfilerGuardBuilder;

pub mod api;
pub mod load_resources;
pub mod paths;
pub mod services;
pub mod utils;
pub mod dapps;
pub mod create_clients;
pub mod path_updater;

#[tokio::main]
async fn main() {
    // Start pprof profiling
    let guard = ProfilerGuardBuilder::default()
        .frequency(1000)
        .blocklist(&["libc", "libgcc", "pthread", "vdso"])
        .build()
        .unwrap();

    // Create a custom EnvFilter
    let filter = EnvFilter::new("off");

    // Initialize tracing for logging
    tracing_subscriber::registry()
        .with(fmt::layer()
            .with_target(true)
            .pretty())
        .with(filter)
        .init();

    // Create the AppState with dapps, chains, tokens, and proxy clients loaded from JSON files
    let state = Arc::new(create_app_state().await);

    // Spawn a background task to reload the tokens.json every 5 minutes
    let state_clone = Arc::clone(&state);
    task::spawn(async move {
        reload_tokens(state_clone).await;
    });
    
    // Create all routes by calling `create_api_routes`
    let app = Router::new()
        .merge(create_api_routes(state))
        .layer(
            ServiceBuilder::new()
                .layer(TraceLayer::new_for_http()
                    .on_request(|request: &Request<_>, _span: &Span| {
                        tracing::info!(
                            "Received a request: {} {}",
                            request.method(),
                            request.uri().path()
                        );
                    })
                    .make_span_with(DefaultMakeSpan::new()
                        .level(Level::INFO)
                    )
                )
        )
        .layer(
            CorsLayer::new()
                .allow_origin(Any)
                .allow_methods(Any)
                .allow_headers(Any),
        );

    // Start the server
    let addr = "0.0.0.0:3000".parse().unwrap();
    info!("Server running on http://{}", addr);
    
    // Run the server
    let server = Server::bind(&addr).serve(app.into_make_service());
    
    // Use tokio::select to run the server and handle a shutdown signal
    tokio::select! {
        _ = server => {},
        _ = tokio::signal::ctrl_c() => {
            println!("Received Ctrl+C, shutting down");
        }
    }

    // Generate pprof report
    if let Ok(report) = guard.report().build() {
        let file = File::create("flamegraph.svg").unwrap();
        report.flamegraph(file).unwrap();
        println!("Flamegraph generated: flamegraph.svg");
    }
}

src/api.rs

use axum::Router;
use std::sync::Arc;
use crate::load_resources::AppState;
use crate::paths::resources::create_resource_routes;
use crate::paths::quote::create_quote_routes;
use crate::paths::quote_stream::create_quote_stream_routes;
use crate::paths::quote_direct::create_quote_direct_routes;
use crate::paths::build_transaction::create_build_transaction_routes;

pub fn create_api_routes(state: Arc<AppState>) -> Router {
    Router::new()
        .merge(create_resource_routes(Arc::clone(&state)))
        .merge(create_quote_routes(Arc::clone(&state)))
        .merge(create_quote_stream_routes(Arc::clone(&state)))
        .merge(create_quote_direct_routes(Arc::clone(&state)))
        .merge(create_build_transaction_routes(Arc::clone(&state)))
}

src/paths/quote.rs

//src/paths/quote.rs
use axum::{Json, Router, routing::{post, get}, extract::Query, extract::Extension, http::StatusCode};
use std::sync::Arc;
use std::collections::HashMap;
use crate::services::quote_service::process_quote;
use crate::load_resources::AppState;
use crate::paths::validate_params::{validate_required_params, format_options};
use serde::{Deserialize, Serialize, Deserializer};
use serde_json::Value;
use tracing::error;


#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct QuoteParams {
    #[serde(deserialize_with = "crate::paths::utils::deserialization_helpers::string_or_number_to_u32")]
    pub from_chain_id: u32,
    pub from_address: String,
    #[serde(deserialize_with = "crate::paths::utils::deserialization_helpers::number_to_string")]
    pub amount: String,
    pub from_token_address: String,
    pub to_token_address: String,
    pub to_address: Option<String>,
    #[serde(deserialize_with = "crate::paths::utils::deserialization_helpers::string_or_number_to_option_u32", default)]
    pub to_chain_id: Option<u32>,
    #[serde(default)]
    pub dapps: Vec<String>,
    pub options: Option<Value>,
    #[serde(flatten)]
    pub other_params: HashMap<String, Vec<String>>,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct GetQuoteParams {
    #[serde(deserialize_with = "crate::paths::utils::deserialization_helpers::string_or_number_to_u32")]
    pub from_chain_id: u32,
    pub from_address: String,
    #[serde(deserialize_with = "crate::paths::utils::deserialization_helpers::number_to_string")]
    pub amount: String,
    pub from_token_address: String,
    pub to_token_address: String,
    pub to_address: Option<String>,
    #[serde(deserialize_with = "crate::paths::utils::deserialization_helpers::string_or_number_to_option_u32", default)]
    pub to_chain_id: Option<u32>,
    #[serde(deserialize_with = "crate::paths::utils::deserialization_helpers::string_or_seq")]
    pub dapps: Vec<String>,
    #[serde(deserialize_with = "crate::paths::utils::deserialization_helpers::string_or_number_to_f64", default)]
    pub slippage: f64,
    pub options: Option<serde_json::Value>,
    #[serde(flatten)]
    pub other_params: std::collections::HashMap<String, Vec<String>>,
}


// Custom deserializer for comma-separated dapps
pub fn comma_separated<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
where
    D: Deserializer<'de>,
{
    let s: String = Deserialize::deserialize(deserializer)?;
    Ok(s.split(',')
        .map(|v| v.trim().to_string())
        .collect())
}

async fn handle_quote_request(
    Extension(state): Extension<Arc<AppState>>, 
    params: QuoteParams,
    is_post: bool,
) -> Result<Json<Value>, StatusCode> {
    tracing::info!("Handling request for /api/quote");

    let validation = validate_required_params(&params, &state);
    if !validation.valid {
        error!("Validation failed: {}", validation.message);
        return Ok(Json(serde_json::json!({
            "success": false,
            "message": validation.message
        })));
    }

    let to_address = params.to_address.clone().unwrap_or(params.from_address.clone());
    let to_chain_id = params.to_chain_id.unwrap_or(params.from_chain_id);

    let options = if is_post {
        params.options.unwrap_or(serde_json::json!({}))
    } else {
        format_options(&params.other_params)
    };

    let transaction_params = serde_json::json!({
        "fromChainId": params.from_chain_id,
        "fromAddress": params.from_address,
        "amount": params.amount,
        "fromTokenAddress": params.from_token_address,
        "toTokenAddress": params.to_token_address,
        "toAddress": to_address,
        "toChainId": to_chain_id,
        "options": options
    });

    // Print the formatted quote JSON
    tracing::info!("Formatted quote JSON: {}", serde_json::to_string_pretty(&transaction_params).unwrap());

    tracing::debug!("Transaction params: {:?}", transaction_params);

    match process_quote(transaction_params, Arc::clone(&state.tokens), Arc::clone(&state)).await {
        Ok(response) => {
            tracing::debug!("Quote processed successfully: {:?}", response);
            Ok(Json(response))
        }
        Err(error) => {
            tracing::error!("Error processing quote: {}", error);
            Ok(Json(serde_json::json!({
                "success": false,
                "message": format!("Server error: {}", error)
            })))
        }
    }
}

pub async fn post_quote_handler(
    Extension(state): Extension<Arc<AppState>>, 
    Json(params): Json<QuoteParams>,
) -> Result<Json<Value>, StatusCode> {
    tracing::info!("Received POST /api/quote request");
    handle_quote_request(Extension(state), params, true).await
}

pub async fn get_quote_handler(
    Extension(state): Extension<Arc<AppState>>, 
    Query(params): Query<GetQuoteParams>,  // Use GetQuoteParams for GET requests
) -> Result<Json<Value>, StatusCode> {
    tracing::info!("Received GET /api/quote request");

    // Convert GetQuoteParams to QuoteParams before passing to handle_quote_request
    let params = QuoteParams {
        from_chain_id: params.from_chain_id,
        from_address: params.from_address,
        amount: params.amount,
        from_token_address: params.from_token_address,
        to_token_address: params.to_token_address,
        to_address: params.to_address,
        to_chain_id: params.to_chain_id,
        dapps: params.dapps.clone(),
        options: Some(serde_json::json!({
            "slippage": params.slippage,
            "dapps": params.dapps,
        })),
        other_params: params.other_params,
    };

    handle_quote_request(Extension(state), params, false).await
}


pub fn create_quote_routes(state: Arc<AppState>) -> Router {
    Router::new()
        .route("/api/quote", post(post_quote_handler))
        .route("/api/quote", get(get_quote_handler))
        .layer(Extension(state))
}

src/services/quote_service.rs

use std::sync::Arc;
use dashmap::DashMap;
use serde_json::{Value, json};
use crate::services::quote_router::route_quote;
use crate::services::quote_stream_router::route_quote_stream;
use crate::load_resources::AppState;
use tokio::time::{Duration, sleep};
use uuid::Uuid;
use tokio::sync::mpsc;
use tracing::{error, info};
use crate::services::transaction_router::route_transaction_from_quote;

type Cache = Arc<DashMap<String, Value>>;

pub async fn process_quote(params: Value, cache: Cache, state: Arc<AppState>) -> Result<Value, String> {
    info!("Processing quote with params: {:?}", params);

    // Process the quote in a separate task to avoid blocking
    let quote_result = tokio::task::spawn(route_quote(params, Arc::clone(&state))).await
        .map_err(|e| format!("Quote task panicked: {}", e))?;

    match quote_result {
        Ok(response) => {
            let request_id = Uuid::new_v4().to_string();
            
            // Use DashMap's entry API for more efficient insertions
            cache.entry(request_id.clone()).or_insert(response.clone());
            state.quote_cache.entry(request_id.clone()).or_insert(response.clone());

            // Spawn a task to remove the cache entry after 10 minutes
            let cache_clone = Arc::clone(&cache);
            let quote_cache_clone = Arc::clone(&state.quote_cache);
            let req_id_clone = request_id.clone();
            tokio::spawn(async move {
                sleep(Duration::from_secs(600)).await;
                cache_clone.remove(&req_id_clone);
                quote_cache_clone.remove(&req_id_clone);
            });

            let result = json!({
                "requestId": request_id,
                "success": response["success"],
                "data": response.get("data").unwrap_or(&json!([]))
            });

            Ok(result)
        }
        Err(error) => {
            error!("Error in quote service: {}", error);
            Err(format!("Quote service failed: {}", error))
        }
    }
}

src/services/quote_router.rs

use crate::utils::filter_dapps::filter_dapps;
use crate::dapps::AVAILABLE_SERVICES;
use crate::utils::utils::fetch_gas_price;
use crate::load_resources::AppState;
use crate::utils::fetch_token_details::fetch_token_details;
use serde_json::{Value, json};
use std::time::Duration;
use tokio::time::timeout;
use std::sync::Arc;
use tracing::{debug, error};
use futures::future::join_all;

pub async fn route_quote(params: Value, state: Arc<AppState>) -> Result<Value, String> {
    // Clone the params and state to avoid lifetime issues in tasks
    let mut extended_params = params.clone();
    let from_chain_id = params["fromChainId"].as_u64().ok_or("Invalid fromChainId")?;
    let to_chain_id = params["toChainId"].as_u64().unwrap_or(from_chain_id);
    let from_token_address = params["fromTokenAddress"].as_str().ok_or("Invalid fromTokenAddress")?;
    let to_token_address = params["toTokenAddress"].as_str().ok_or("Invalid toTokenAddress")?;
    
    // Define the native zero address
    let zero_address = "0x0000000000000000000000000000000000000000";

    let gas_prices = fetch_gas_price(from_chain_id, Arc::clone(&state)).await
        .map_err(|e| format!("Failed to fetch gas prices: {}", e))?;

    // Extend params with gas prices and set quoteOnly
    extended_params["gasPrices"] = json!(gas_prices);
    extended_params["quoteOnly"] = json!(true);

    // Filter available dapps
    let available_dapps_names = filter_dapps(
        from_token_address,
        from_chain_id,
        to_chain_id,
        Arc::clone(&state)
    );

    debug!("Available DApps: {:?}", available_dapps_names);

    if available_dapps_names.is_empty() {
        return Ok(json!({
            "success": false,
            "message": "No dApps available"
        }));
    }

    // Create a longer-lived Arc clone
    let state_clone = Arc::clone(&state);

    // Pass all tokens (fromToken, toToken, and the zero address token) to fetch_token_details at once
    let tokens_with_chain_ids = vec![
        (from_token_address, from_chain_id),
        (to_token_address, to_chain_id),
        (zero_address, from_chain_id), // Native token (zero address)
    ];

    let token_details = fetch_token_details(tokens_with_chain_ids, &state_clone).await
        .map_err(|e| format!("Failed to fetch token details: {}", e))?;

    // Assume token_details contains details for all tokens in the order they were passed
    let from_token_details = &token_details[0];
    let to_token_details = &token_details[1];
    let native_token_details = &token_details[2];

    extended_params["fromTokenDetails"] = json!(from_token_details);
    extended_params["toTokenDetails"] = json!(to_token_details);
    extended_params["nativeTokenDetails"] = json!(native_token_details); 

    let services_to_run: Vec<_> = if let Some(options) = params.get("options") {
        if let Some(dapps) = options.get("dapps").and_then(|d| d.as_array()) {
            if dapps.is_empty() {
                // If dapps array is empty, use all available dapps
                available_dapps_names.iter()
                    .filter(|dapp| AVAILABLE_SERVICES.contains_key(dapp.as_str()))
                    .map(|dapp| (dapp.to_string(), AVAILABLE_SERVICES[dapp.as_str()]))
                    .collect()
            } else {
                // If dapps array is not empty, use only the specified dapps
                dapps.iter()
                    .filter_map(|dapp| dapp.as_str())
                    .filter(|&dapp| available_dapps_names.contains(&dapp.to_string()) && AVAILABLE_SERVICES.contains_key(dapp))
                    .map(|dapp| (dapp.to_string(), AVAILABLE_SERVICES[dapp]))
                    .collect()
            }
        } else {
            // If dapps key is not present, use all available dapps
            available_dapps_names.iter()
                .filter(|dapp| AVAILABLE_SERVICES.contains_key(dapp.as_str()))
                .map(|dapp| (dapp.to_string(), AVAILABLE_SERVICES[dapp.as_str()]))
                .collect()
        }
    } else {
        // If options is not present, use all available dapps
        available_dapps_names.iter()
            .filter(|dapp| AVAILABLE_SERVICES.contains_key(dapp.as_str()))
            .map(|dapp| (dapp.to_string(), AVAILABLE_SERVICES[dapp.as_str()]))
            .collect()
    };

    if services_to_run.is_empty() {
        return Ok(json!({
            "success": false,
            "message": "No valid quotes found."
        }));
    }

    // Prepare the list of futures without spawning tasks
    let futures = services_to_run.into_iter().map(|(name, service)| {
        let params_clone = extended_params.clone();
        let state_clone = Arc::clone(&state);
        async move {
            match timeout(Duration::from_secs(30), service(params_clone, state_clone)).await {
                Ok(data) => match data {
                    Ok(value) => {
                        if validate_response_format(&value) {
                            Some(json!({
                                "name": name,
                                "data": value
                            }))
                        } else {
                            error!("Invalid response format from {}: {:?}", name, value);
                            None
                        }
                    },
                    Err(e) => {
                        error!("Error in {}: {}", name, e);
                        None
                    }
                },
                Err(_) => {
                    error!("Timeout for {}", name);
                    None
                }
            }
        }
    });

    // Execute all futures concurrently
    let results: Vec<_> = join_all(futures).await
        .into_iter()
        .filter_map(|res| res)
        .collect();
    

    if results.is_empty() {
        return Ok(json!({
            "success": false,
            "message": "No valid quotes found."
        }));
    }

    // Sort and structure the response
    let mut sorted_results: Vec<Value> = results.into_iter()
        .enumerate()
        .map(|(index, result)| {
            json!({
                "id": index + 1,
                "name": result["name"],
                "data": result["data"]
            })
        })
        .collect();

    sorted_results.sort_by(|a, b| {
        let a_amount = a["data"]["toAmount"].as_str().unwrap_or("0");
        let b_amount = b["data"]["toAmount"].as_str().unwrap_or("0");
        b_amount.cmp(a_amount)
    });

    Ok(json!({
        "success": true,
        "data": sorted_results
    }))
}

fn validate_response_format(data: &Value) -> bool {
    data.get("tool").is_some() &&
    data.get("fromChainId").is_some() &&
    data.get("fromAmount").is_some() &&
    data.get("toAmount").is_some() &&
    data.get("fromToken").is_some() &&
    data.get("toToken").is_some() &&
    data.get("transaction").is_some()
}

src/dapps/mod.rs

pub mod jumper;
pub mod odos;
pub mod zero_x;
pub mod inch;
pub mod across;
pub mod balancer;
pub mod bungee;
pub mod debridge;
pub mod dododex;
pub mod hyphen;
pub mod koi;
pub mod kyberswap;
pub mod minibridge;
pub mod native;
pub mod layerswap;
pub mod nitro;
pub mod okx;
pub mod openocean;


pub use self::index::AVAILABLE_SERVICES;

mod index {
    use std::collections::HashMap;
    use serde_json::Value;
    use std::sync::Arc;
    use crate::load_resources::AppState;
    use lazy_static::lazy_static;
    use futures::future::BoxFuture;
    use super::*;
    
    
    // Define a function type for DApp services
    type DappServiceFn = fn(Value, Arc<AppState>) -> BoxFuture<'static, Result<Value, String>>;


    lazy_static! {
        // A map that links DApp names to their respective service functions
        pub static ref AVAILABLE_SERVICES: HashMap<&'static str, DappServiceFn> = {
            let mut m = HashMap::new();
            m.insert("jumper", jumper::get_swap_quote as DappServiceFn);
            m.insert("odos", odos::get_swap_quote as DappServiceFn);
            m.insert("1inch", inch::get_swap_quote as DappServiceFn);
            m.insert("0x", zero_x::get_swap_quote as DappServiceFn);
            m.insert("across", across::get_swap_quote as DappServiceFn);
            m.insert("balancer", balancer::get_swap_quote as DappServiceFn);
            m.insert("bungee", bungee::get_swap_quote as DappServiceFn);
            m.insert("debridge", debridge::get_swap_quote as DappServiceFn);
            m.insert("dododex", dododex::get_swap_quote as DappServiceFn);
            m.insert("hyphen", hyphen::get_swap_quote as DappServiceFn);
            m.insert("koi", koi::get_swap_quote as DappServiceFn);
            m.insert("kyberswap", kyberswap::get_swap_quote as DappServiceFn);
            m.insert("minibridge", minibridge::get_swap_quote as DappServiceFn);
            m.insert("native", native::get_swap_quote as DappServiceFn);
            m.insert("layerswap", layerswap::get_swap_quote as DappServiceFn);
            m.insert("nitro", nitro::get_swap_quote as DappServiceFn);
            m.insert("okx", okx::get_swap_quote as DappServiceFn);
            m.insert("openocean", openocean::get_swap_quote as DappServiceFn);
            m
        };
    }
}

Individual dapps examples:

openocean.rs

// src/dapps/openocean.rs

use crate::load_resources::AppState;
use crate::utils::format_swap_details::format_swap_details;
use serde::Deserialize;
use serde_json::{json, Value};
use std::sync::Arc;
use ethers::utils::format_units;
use tracing::debug;
use futures::future::BoxFuture;
use futures::FutureExt;

#[derive(Debug, Deserialize)]
struct QuoteResponse {
    code: u32,
    data: Option<QuoteData>,
}

#[derive(Debug, Deserialize)]
struct QuoteData {
    to: String,
    data: String,
    value: String,
    #[serde(rename = "outAmount")]
    out_amount: String,
    #[serde(rename = "estimatedGas")]
    estimated_gas: u64,
}

#[derive(Debug, Deserialize)]
struct TokenDetails {
    decimals: u8,
}

pub fn get_swap_quote(params: Value, state: Arc<AppState>) -> BoxFuture<'static, Result<Value, String>> {
    async move {
        // Extract necessary fields from params
        let from_token_details: TokenDetails = serde_json::from_value(params["fromTokenDetails"].clone())
            .map_err(|e| format!("Invalid fromTokenDetails: {}", e))?;
        let from_chain_id = params["fromChainId"]
            .as_u64()
            .ok_or_else(|| "Invalid fromChainId".to_string())?;
        let to_chain_id = params["toChainId"]
            .as_u64()
            .ok_or_else(|| "Invalid toChainId".to_string())?;
        let amount = params["amount"]
            .as_str()
            .ok_or_else(|| "Invalid amount".to_string())?;
        let from_token_address = params["fromTokenAddress"]
            .as_str()
            .ok_or_else(|| "Invalid fromTokenAddress".to_string())?;
        let to_token_address = params["toTokenAddress"]
            .as_str()
            .ok_or_else(|| "Invalid toTokenAddress".to_string())?;
        let to_address = params["toAddress"]
            .as_str()
            .ok_or_else(|| "Invalid toAddress".to_string())?;
        let from_address = params["fromAddress"]
            .as_str()
            .ok_or_else(|| "Invalid fromAddress".to_string())?;
        let options = params["options"]
            .as_object()
            .ok_or_else(|| "Invalid options".to_string())?;
        let gas_prices = params["gasPrices"]
            .as_array()
            .ok_or("Invalid gasPrices")?;
        let quote_only = params["quoteOnly"].as_bool().unwrap_or(false);
        let gas_price_gwei = gas_prices
            .get(1)
            .and_then(|v| v.as_str())
            .ok_or("Invalid gasPriceGwei")?;

        debug!("Processing swap quote with params: {:?}", params);

        // Convert amount to human-readable format
        let amount_in_human_readable = format_units(
            amount
                .parse::<u128>()
                .map_err(|e| format!("Invalid amount: {}", e))?,
            from_token_details.decimals as u32,
        )
        .map_err(|e| e.to_string())?;

        // Parse slippage percentage
        let slippage_percentage = options["slippage"]
            .as_str()
            .and_then(|s| s.parse::<f64>().ok())
            .or_else(|| options["slippage"].as_f64())
            .map(|v| v / 100.0)
            .ok_or_else(|| "Invalid slippage".to_string())?;

        // Fetch quote from OpenOcean API
        let quote = get_quote(
            from_chain_id,
            to_chain_id,
            from_token_address,
            to_token_address,
            &amount_in_human_readable,
            to_address,
            slippage_percentage,
            gas_price_gwei,
            &state,
        )
        .await?;

        // Check if the quote is successful
        if quote.code == 200 && quote.data.is_some() {
            let quote_data = quote.data.unwrap();
            let transaction_data = json!({
                "from": from_address,
                "to": quote_data.to,
                "chainID": from_chain_id,
                "data": quote_data.data,
                "value": quote_data.value
            });
            let estimated_gas_string = quote_data.estimated_gas.to_string();
            let formatted_data = format_swap_details(
                "openocean",
                &params,
                &transaction_data,
                &json!(quote_data.out_amount),
                &json!(quote_data.to),
                &json!(gas_prices),
                if quote_only {
                    Some(&estimated_gas_string)
                } else {
                    None
                },
                None,
                None,
                &state,
            )
            .await
            .map_err(|e| e.to_string())?;

            Ok(formatted_data)
        } else {
            Err("Unable to process quote.".to_string())
        }
    }
    .boxed()
}

async fn get_quote(
    from_chain: u64,
    _to_chain: u64,
    from_token: &str,
    to_token: &str,
    from_amount: &str,
    to_address: &str,
    slippage_percentage: f64,
    gas_price: &str,
    state: &Arc<AppState>,
) -> Result<QuoteResponse, String> {
    let client = reqwest::Client::new();

    let referrer = state.settings["openocean"]["referrer"].as_str().unwrap_or("");
    let fee = state.settings["openocean"]["fee"].as_str().unwrap_or("");
    let disable_fee = state.settings["openocean"]["disableFee"].as_bool().unwrap_or(false);

    let slippage_str = slippage_percentage.to_string();

    let mut params = vec![
        ("inTokenAddress", from_token),
        ("outTokenAddress", to_token),
        ("amount", from_amount),
        ("account", to_address),
        ("gasPrice", gas_price),
        ("slippage", &slippage_str),
        ("referrer", referrer),
        ("sender", to_address),
    ];

    if !disable_fee {
        params.push(("referrerFee", fee));
    }

    let url = format!("https://open-api.openocean.finance/v3/{}/swap_quote", from_chain);

    let response = client.get(&url)
        .query(&params)
        .send()
        .await
        .map_err(|e| format!("Failed to fetch quote: {}", e))?;

    let status = response.status();
    let response_text = response.text().await.map_err(|e| format!("Failed to read response: {}", e))?;

    debug!("Quote API Response (Status {}):\n{}\n", status, response_text);

    if !status.is_success() {
        return Err(format!("API responded with status {}: {}", status, response_text));
    }

    serde_json::from_str(&response_text).map_err(|e| format!("Failed to parse quote response: {}", e))
}

Did you compile with --release before testing performance?

5 Likes

If it is in a public git repo, please link to that. Some people will find it easier to clone the repo and read the code in their editor.

1 Like

What you describe sounds like it could be a deadlock. I've encountered several deadlocks using the dashmap crate. (programming mistakes on my part, not understanding the library)

If I had to guess it could be related to that. Double check you're not hold any *Ref types over an await point. See this blog post and the ones linked within for more details: Beware of the DashMap deadlock – GNUnicorn

It could easily be something else. I don't see anything in the code that's clearly a deadlock, but there's also a lot of code and I didn't look that closely.

1 Like

I don't have the will to do a review currently sorry (hard day at work), but this makes me suspect that either you won't be able to gain any performance by switching languages and/or that your general algorithm is wrong in both languages.

yes used --release for all tests

I uploaded snippet of the codebase, also included all utils scripts that could also be related. Could not uplaod them into the post:
epxteku/API_shared_snippet (github.com)

I write into the DashMap only once at start of the app,
and with every reqeust I write quote information (about 20 lines so it should not cause deadlock?)
also thought DashMap should be not blocking regards to writes and reads.
I read a lot from the AppState and pass it a lot.

only place where i write besides start is this(dont think its the write lock issue):
Also provided the flamegraphs but I dont rly get what exactly is wrong from that.

type Cache = Arc<DashMap<String, Value>>;

pub async fn process_quote(params: Value, cache: Cache, state: Arc<AppState>) -> Result<Value, String> {
    info!("Processing quote with params: {:?}", params);

    // Process the quote in a separate task to avoid blocking
    let quote_result = tokio::task::spawn(route_quote(params, Arc::clone(&state))).await
        .map_err(|e| format!("Quote task panicked: {}", e))?;

    match quote_result {
        Ok(response) => {
            let request_id = Uuid::new_v4().to_string();
            
            // Use DashMap's entry API for more efficient insertions
            cache.entry(request_id.clone()).or_insert(response.clone());
            state.quote_cache.entry(request_id.clone()).or_insert(response.clone());

            // Spawn a task to remove the cache entry after 10 minutes
            let cache_clone = Arc::clone(&cache);
            let quote_cache_clone = Arc::clone(&state.quote_cache);
            let req_id_clone = request_id.clone();
            tokio::spawn(async move {
                sleep(Duration::from_secs(600)).await;
                cache_clone.remove(&req_id_clone);
                quote_cache_clone.remove(&req_id_clone);
            });

            let result = json!({
                "requestId": request_id,
                "success": response["success"],
                "data": response.get("data").unwrap_or(&json!([]))
            });

            Ok(result)
        }
        Err(error) => {
            error!("Error in quote service: {}", error);
            Err(format!("Quote service failed: {}", error))
        }
    }
}

I write into the DashMap only once at start of the app

No point in using DashMap, then.

Looking at your flamegraph, about a quarter of the runtime of route_quote is taken up by manipulating untyped serde_json::Values. (hint: that's not really how serde is supposed to be used)

Thank you for answer <3
Do you mean setting the <&Value> to any not previously set format/type could cause the runtime of the serde_json to be so high?
Like this example below (I use it for most of my functions as I did not think it will have any downside):

pub async fn format_swap_details(
    tool: &str,
    params: &Value,
    transaction: &Value,
    to_amount: &Value,
    approval_address: &Value,
    gas_data: &Value,
    gas_estimate: Option<&str>,
    additional_fee: Option<&Value>,
    dapp_options: Option<&Value>,
    state: &Arc<AppState>
) -> Result<Value, String> {

Or do you think that praising of jsons does cause the hudge runtime? as i did multiple tests before i implemented lot of simple serde_json praising and its nanoseconds for hudge json files so i did not think it should be problem. as i am dealing with files 1000x smaller than my tests

Did you use the same data structure in the tests (including the same use of the Value type) as in the final program?

Yes i did. But even when its taking 25% of the runtime. it should not halt the app after 30 seconds of reqeusts no? it responds without any problem without any delays and then there is breakpoint where it stops responding completly.

Do you sugest i define all the type structures and use them instead of value? should that also help?

I did multiple tests of praising.
The best method for dynamic data with same type strucute is definitly partial method.
its about 4-7 times faster than the serde_json::Value.

To my supriese defining whole type structure (Strongly typed) is almost as bad as serde_json::Value (in some rare scenarios it was twice as fast comapred to Value)

In my code I am also using the the Value on static data that are fetched at start of the script. I tested optimized lookup with hashmap structure and it was about 4000x faster average than partial praise and about 19000x faster than value.

This is definitly way where i can save a lof of runtime

When i run the app on ubuntu server instead of WSL2 I get 5 times as many requests per second capacity and it does not crash even at higher rates.
Even when the server has 100times less computational capacity

check out the function fetch_token_details function

You're holding json_tokens variable which is of type Option<Ref<'_, String, Value>> over the await on line 125.

Try adding a new scope to ensure it is dropped before the await encountered.

1 Like

I noticed that the flamegraph was not complete at all.
I you dont mind checking this one. there could be the actual bottlenect shown

If your app is freezing after 30 seconds, I would run it in a debugger and then once the app seems frozen, pause the debugger and see where each thread is executing. Are all the threads waiting to acquire a lock? Are they waiting indefinitely for a response from a remote Internet server? Or something else?

2 Likes

Dont rly know how to do it, as I am new to programming but will do my best to educate myself :smiley: Will definitly try thank you

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.