Tokio-postgres hangs process on connection

Here is an actix server that asynchronously collates a series of requests from a given ip and then attempts to insert them into a postgres db:

main.rs

#[macro_use]
extern crate dotenv_codegen;
use actix_web::{get, web, App, HttpRequest, HttpResponse, HttpServer, Responder};
use serde::Serialize;
use std::{collections::HashMap, sync::Mutex};
use tokio::time::{sleep, Duration};
use tokio_postgres::NoTls;

#[derive(Serialize, Debug, Clone)]
pub struct Fingerprint {
    properties: Vec<(String, String)>,
    fonts: Vec<String>,
    headers: Vec<(String, String)>,
    timestamp: i64,
}

impl Fingerprint {
    fn insert(&mut self, (key, value): (String, String)) {
        if key == "font-name" {
            self.fonts.push(value)
        } else {
            self.properties.push((key, value))
        }
    }
}

const URL: &str = dotenv!("DATABASE_URL");
const PORT: u16 = 8000;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    println!("🚀 Server running on port: {}!", PORT);

    let data = web::Data::new(Mutex::new(HashMap::<String, Fingerprint>::new()));

    HttpServer::new(move || {
        App::new()
            .app_data(data.clone())
            .service(new_prop)
    })
    .bind(format!("127.0.0.1:{}", PORT))?
    .run()
    .await
}

async fn export_data(ip: &str, fingerprint: Fingerprint) {
    // Do Something
    println!("ip: {}, fingerprint: {:#?}", ip, fingerprint.properties);

    match tokio_postgres::connect(&URL, NoTls).await {
        Ok((client, connection)) => {
            connection.await.unwrap();
            client
                .execute(
                    "INSERT INTO fingerprints (ip, fingerprint) VALUES ($1,$2);",
                    &[&ip, &serde_json::to_string(&fingerprint).unwrap().as_str()],
                )
                .await
                .unwrap();
        }
        Err(e) => {
            println!("Error: {}", e)
        }
    }
}

// Handle Standard Properties
#[get("/some/url/{key}={value}")]
async fn new_prop(
    data: web::Data<Mutex<HashMap<String, Fingerprint>>>,
    request: HttpRequest,
    pair: web::Path<(String, String)>,
) -> impl Responder {
    // Determine response type
    let response = if pair.0 == "308" {
        HttpResponse::BadRequest()
    } else {
        // Stops browser from re-requesting properties on reload
        HttpResponse::Gone()
    };

    // Obtain lock on data
    let mut map = data.lock().unwrap();
    let con_info = request.connection_info();
    let ip = con_info
        .realip_remote_addr()
        .unwrap()
        .split(':')
        .next()
        .unwrap();

    // Create new Fingerprint or add prop to existing
    match map.get_mut(ip) {
        Some(f) => f.insert(pair.into_inner()),
        None => {
            let mut f = Fingerprint {
                properties: Vec::new(),
                fonts: Vec::new(),
                headers: request
                    .headers()
                    .iter()
                    .map(|(key, value)| {
                        (
                            key.as_str().to_owned(),
                            value.to_str().unwrap_or("opaque").to_owned(),
                        )
                    })
                    .collect(),
                timestamp: chrono::offset::Utc::now().timestamp(),
            };
            f.insert(pair.into_inner());
            map.insert(ip.to_owned(), f);

            // Release lock on data
            drop(map);

            // Wait x seconds before obtaining a lock on the data and then export to DB
            sleep(Duration::from_millis(10000)).await;
            export_data(ip, data.lock().unwrap().remove(ip).unwrap()).await;
        }
    }

    response
}

Cargo.toml

...
[dependencies]
tokio = { version= "1.12.0", features=["time"] }
actix-web = "4.0.0-beta.9"
random-string = "1.0"
chrono = "0.4"
tokio-postgres = "0.7.3"
dotenv_codegen = "0.15.0"
serde = "1.0.117"
serde_json = "1.0.59"

.env

DATABASE_URL=postgres://pi:password@localhost/css

This program works fine up until data is inserted into postgres, at which point, the server ceases to function with no panics and continues to run with no further outputs. Does anyone know why this might be happening?

This line:

export_data(ip, data.lock().unwrap().remove(ip).unwrap()).await;

behaves like this:

let mut lock = data.lock().unwrap();
export_data(ip, lock.remove(ip).unwrap()).await;
drop(lock);

If anything else on the same thread also calls data.lock() while the export_data function is running, then you will deadlock because Tokio can only swap the currently running task at an .await, so the task trying to lock it will be sleeping at something that isn't an .await, and so Tokio wont be able to switch back to the task holding the lock, and hence it is never released.

The best way to avoid something like this is to define a wrapper struct around your mutex, and then define non-async utility methods on that struct for each thing you need to do. For example:

struct Fingerprints {
    map: Arc<Mutex<HashMap<String, Fingerprint>>>,
}

impl Fingerprints {
    // This is not async!
    pub fn take_by_ip(&self, ip: &str) -> Option<Fingerprint> {
        data.lock().unwrap().remove(ip)
    }
}

Because the method isn't async, it is impossible to make the mistake that you did with the above pattern. You can do something similar for the other operations you perform. Another useful type of method would be the following:

impl Fingerprints {
    pub fn with_lock<F, T>(&self, func: F) -> T
    where
        F: FnOnce(&mut HashMap<String, Fingerprint>) -> T,
    {
        let mut lock = self.data.lock().unwrap();
        func(&mut *lock)
    }
}

which is used like this:

// Create new Fingerprint or add prop to existing
let should_export = fingerprints.with_lock(|map| {
    match map.get_mut(ip) {
        Some(f) => {
            f.insert(pair.into_inner());
            false
        }
        None => {
            let mut f = Fingerprint {
                properties: Vec::new(),
                fonts: Vec::new(),
                headers: request
                    .headers()
                    .iter()
                    .map(|(key, value)| {
                        (
                            key.as_str().to_owned(),
                            value.to_str().unwrap_or("opaque").to_owned(),
                        )
                    })
                    .collect(),
                timestamp: chrono::offset::Utc::now().timestamp(),
            };
            f.insert(pair.into_inner());
            map.insert(ip.to_owned(), f);

            true
        }
    }
});
if should_export {
    // Wait x seconds before obtaining a lock on the data and then export to DB
    sleep(Duration::from_millis(10000)).await;
    export_data(ip, data.take_by_ip(ip)).await;
}

Here the above example highlights how the with_lock pattern will prevent you from running in to the deadlock from earlier by making it impossible to use .await inside the with_lock closure. Instead, the example returns a boolean so that it can perform the .await after holding the lock.

Though in your case you should probably be spawning the export_data thing as a background task, which you can actually do from inside with_lock.

// Create new Fingerprint or add prop to existing
fingerprints.with_lock(|| {
    match map.get_mut(ip) {
        Some(f) => f.insert(pair.into_inner()),
        None => {
            let mut f = Fingerprint {
                properties: Vec::new(),
                fonts: Vec::new(),
                headers: request
                    .headers()
                    .iter()
                    .map(|(key, value)| {
                        (
                            key.as_str().to_owned(),
                            value.to_str().unwrap_or("opaque").to_owned(),
                        )
                    })
                    .collect(),
                timestamp: chrono::offset::Utc::now().timestamp(),
            };
            f.insert(pair.into_inner());
            map.insert(ip.to_owned(), f);

            let data = data.clone();
            tokio::spawn(async move {
                // Wait x seconds before obtaining a lock on the data and then export to DB
                sleep(Duration::from_millis(10000)).await;
                export_data(ip, data.take_by_ip(ip)).await;
            })
        }
    }
});
6 Likes

What an excellent answer, thank you so much for taking the time to write that! This has completely solved the issue of the program halting.

However, it still does not appear to insert into the database and also still doesn't give any error messages.

Ah, the database issue was caused by connection needing to be awaited in another thread:

async fn export_data(ip: &str, fingerprint: Fingerprint) {
    match tokio_postgres::connect(&URL, NoTls).await {
        Ok((client, connection)) => {
            tokio::spawn(async move {
                if let Err(e) = connection.await {
                    eprintln!("connection error: {}", e);
                }
            });
            client
                .execute(
                    "INSERT INTO fingerprints (ip, fingerprint) VALUES ($1,$2);",
                    &[&ip, &serde_json::to_value(fingerprint).unwrap()],
                )
                .await
                .unwrap();
        }
        Err(e) => {
            println!("Error: {}", e)
        }
    }
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.