Can this implementation of deep cloning an Arc handle multiple requests at the same time?

This is a follow-up question of Make variable unique for each user https server - #2 by behai and Deep copy value of Arc<Mutex<SomeTrait>>>? - #4 by ONiel.

I had a HTTP server with a variable api which is a struct containing API-calls and a API-key which is unique for each user. The problem I had was that the API is initiated once in a Arc and gets passed deeper into my code. Because it is a Arc I had no way to make it unique for each session.

Thanks to @quinedot I now have a way to deep-clone Arcs. This way I can retrieve the session-token from a request, retrieve the API keys linked to that session in the database, and execute API-methods with that unique API key.

This is some code showing the implementation:

use std::io::{stdin,stdout,Write};
use std::sync::Arc;
use tokio::sync::Mutex;
use async_trait::async_trait;
use lazy_static::lazy_static;


lazy_static! {
    // Database where the session token of a user is linked to an API key.
    pub static ref DATABASE: Arc<Mutex<std::collections::HashMap<String, String>>> = Arc::new(Mutex::new(std::collections::HashMap::new())); 
}

pub trait CloneApi {
   fn clone_animal<'s>(&self) -> Arc<Mutex<dyn Api + 's>> where Self: 's;
}

impl<T: Clone + Api> CloneApi for T {
   fn clone_animal<'s>(&self) -> Arc<Mutex<dyn Api + 's>> where Self: 's {
       Arc::new(Mutex::new(self.clone()))
   }
}

#[async_trait]
pub trait AsyncCloneApi {
    async fn async_clone<'s>(&self) -> Arc<Mutex<dyn Api + 's>> where Self: 's;
}

#[async_trait]
impl<T: ?Sized + CloneApi + std::marker::Send> AsyncCloneApi for Arc<Mutex<T>> {
    async fn async_clone<'s>(&self) -> Arc<Mutex<dyn Api + 's>> where Self: 's {
        (*(self.lock().await)).clone_animal()
    }
}

// -----

// Added supertrait bound
#[async_trait]
pub trait Api: CloneApi + Send {
    fn new() -> Self where Self: Sized;
    async fn auth(&mut self, session_token: String) -> Result<(), String>;
    fn get_api_key(&self) -> String;
    async fn ping(&self);
    async fn make_request(&self);
    async fn make_websocket(&self);
}

// Added derive
#[derive(Clone)]
pub struct BinanceApi {
    pub api_key: String,
}

#[async_trait]
impl Api for BinanceApi {
    fn new() -> Self {
        BinanceApi {
           api_key: "".to_string(), 
        }
    }

    async fn auth(&mut self, session_token: String) -> Result<(), String> {
        // Retrieve api key from database by session token.
        let db_guard = DATABASE.lock().await;
        let api_key = match db_guard.get(&session_token) {
            Some(key) => key,
            None => {
                return Err("Error retrieving session".to_string());
            }
        };

        self.api_key = api_key.to_string();
        drop(db_guard);
        
        Ok(())
    }

    fn get_api_key(&self) -> String {
        self.api_key.to_string()
    }

    async fn ping(&self) {
        println!("API pinged...");
    }

    async fn make_request(&self) {
        tokio::time::sleep(tokio::time::Duration::from_millis(1500)).await;
        println!("API instance with key {} made a request...", self.api_key);
    }

    async fn make_websocket(&self) {
        let clone = self.clone();
        tokio::spawn(async move {
            loop {
                println!("API instance with key {} received data from a websocket stream...", clone.api_key);
                tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await;
            }
        });
    }
}

#[tokio::main]
async fn main() {
    // Seed database.
    let mut db_guard = DATABASE.lock().await;
    db_guard.insert("session123".into(), "abc123".into());
    db_guard.insert("session456".into(), "abc456".into());
    db_guard.insert("session789".into(), "abc798".into());
    drop(db_guard);
    

    // Make general API instance for calls the system make which do not require
    // authentication like pinging.
    let api : BinanceApi = Api::new();
    let api = Arc::new(Mutex::new(api));
    
    // HTTP server simulator.
    tokio::spawn(async move {
        loop {
            let mut request = String::new();
            let _= stdout().flush();
            stdin().read_line(&mut request).expect("Invalid request.");
        
            if &*request == "\n" {
                continue;
            }

            let request : std::vec::Vec<&str> = request.split(" ").collect();
            let path = request[0];
            let session_token = request[1].trim();

            match path {
                "/request" => {
                    route_request(session_token, api.async_clone().await).await;
                },
                "/websocket" => {
                    route_websocket(session_token, api.async_clone().await).await;
                },
                "/call-function-with-api" => {
                   route_call_function_with_api(session_token, api.async_clone().await).await; 
                },
                _ => {
                    println!("Invalid request!");
                    continue;
                }
            }
        }
    });
                

    // Keep running.
    tokio::time::sleep(tokio::time::Duration::from_millis(500000)).await;
}

// Routes
pub async fn route_request(session_token: &str, api: Arc<Mutex<dyn Api>>) {
    let mut api_guard = api.lock().await;
    api_guard.auth(session_token.to_string()).await.expect("Error authenticating API.");
    
    println!("Session {} is doing a request with API-key {}", session_token, api_guard.get_api_key());
    api_guard.make_request().await;
    
    drop(api_guard);
}


pub async fn route_websocket(session_token: &str, api: Arc<Mutex<dyn Api>>) {
    let mut api_guard = api.lock().await;
    api_guard.auth(session_token.to_string()).await.expect("Error authenticating API.");
    
    println!("Session {} is making a websocket with API-key {}", session_token, api_guard.get_api_key());
    api_guard.make_websocket().await;
    
    drop(api_guard);
}


pub async fn route_call_function_with_api(session_token: &str, api: Arc<Mutex<dyn Api>>) {
    let mut api_guard = api.lock().await;
    api_guard.auth(session_token.to_string()).await.expect("Error authenticating API.");
    drop(api_guard);

    // Because the API key is set already we don't need async_clone anymore here.
    func(api.clone()).await;
}

pub async fn func(api: Arc<Mutex<dyn Api>>) {
    let api_clone = api.clone();
    tokio::spawn(async move {
        let api_guard = api_clone.lock().await;
        loop {
            println!("API instance with key {} executed func...", api_guard.get_api_key());
            tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await;
        }
    });
}

Is this a reliable implementation? My main worry is, that because of all the locks, this program will fail when I have a lot of multiple requests from many different users coming in.

Are you worried about deadlocks? If so, look for places where two Mutexes are held at a time -- a deadlock cannot occur otherwise. If you find such a case, ensure that the two Mutexes are always acquired in the same order. Acquiring two or more Mutexes in different orders (in different places) is what causes deadlocks.

Edit: I changed the above to say "two Mutexes" instead of "two locks" since I think I may have confused you. You need at least two different Mutexes to cause a deadlock.

Or are you worried about performance because the locks are serializing the requests?

1 Like

At first glance this function looks suspicious: you're taking a lock and then you start an infinite loop, meaning the lock will never be released. If there's any code that that tries to lock that same Arc<Mutex<dyn Api>> it will be stuck forever.

1 Like

The function async_clone takes a lock (*(self.lock().await)).clone_animal().

For example each time a request is made I use async_clone():

"/request" => {
    route_request(session_token, api.async_clone().await).await;
},

What if two users make a request at exactly the same time?

I am considering a different approach. I have made the following two functions:

fn create_api_instance() -> Arc<dyn ExchangeAPI> {
    let api : api::binance::Binance = ExchangeAPI::new(
        config::REST_API_URL.as_str(),
        config::WEBSOCKET_API_URL.as_str(),
        config::WEBSOCKET_STREAM_URL.as_str(),
        config::EXCHANGE_API_KEY.as_str(),
        config::EXCHANGE_API_SECRET.as_str(),
    );

    Arc::new(api)
}


async fn create_postgresql_instance() -> (Arc<Mutex<Client>>, Connection<Socket, NoTlsStream>) {
    let (client, connection) = tokio_postgres::connect("host=localhost user=postgres dbname=cryptoxonline", NoTls).await.unwrap();

    (Arc::new(Mutex::new(client)), connection)
}

Each time a request is received I do the following:

    // Spawn process to handle incoming streams.
    loop {
        // Stream for HTTP.
        let (mut stream, _) = server.accept().await.unwrap();

        tokio::spawn(async move {
            let api = create_api_instance();
            let psql = create_postgresql_instance().await.0;
            
            if let Err(e) = http::process(&mut stream, psql, api).await {
                eprintln!("Error: {}", e);
            }
        });
    }

Later in my code I can use the session_token to get the correct API keys for the specific user. This way each request has it's own instance of both psql and API. This way a deadlock is not possible?

Then one request will block (wait) until the other request is done cloning. This is not a deadlock.

Based on this confusion, I think your overall question is actually about blocking, not deadlocks, correct?

Note that you don't need an async Mutex unless the Mutex needs to be locked across an await. If it doesn't need to be locked across an await, use a normal std::sync::Mutex as described here.

You mean blocking is not possible.

Yes, it reduces blocking for each request to use a different connection to each service it uses.

However, it is also sometimes expensive to create such connections, although I don't actually know how expensive it is to create these particular connections. And sometimes there is a limited number of simultaneous connections you're allowed to make to a particular service, or the service itself only handles a limited number of connections at a time. I don't know about ExchangeAPI but a database does usually have such limits; postgres definitely does and you should be able to find a crate with connection pooling for Postgres.

When there are such expenses or limits, the typical approach is to use connection pooling. But even if you will need to use connection pooling eventually, it could make sense to use a connection per request for now, test with a small number of requests at a time for now, and add pooling later.

Yeah my latest worry is that I currently await the lock each time I make a database query:

        let query = psql.lock().await
            .query("
                SELECT
                    id
                FROM
                    users
                WHERE
                    session_token = $1
             ", &[&session_token]).await;

psql is type type Psql = Arc<Mutex<Client>>; (tokio Mutex). It needs to be a Mutex because the query needs to be mutated due to prepared statements. But I could also use sync mutex and then it wouldn't block?

I tried connection pooling but didn't got it to work. Will retry.

A sync mutex will block -- all mutexes block -- but it is much less expensive as explained in the link I sent.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.