Actix-web with a separate consumer thread

I have an actix-web server that basically does an ETL process. I can invoke a URL to tell the system to start the ETL process for a given customer_id. The ETL process is costly, and I don't want to process the same customer multiple times in a row, so I have list of customer_ids pending to process, so:

  1. An actix-web handler listening to the URL /import/{customer_id} will keep on adding entries to the Vec if they are not already there.

  2. A separate worker thread should keep looping over the list, and process all entities in the list. If the list is empty, well, I'm happy for now if it just sleeps for 5 seconds and starts over.

The problem is that I cannot find a way to share the list between the worker thread and Actix shared data: Actix owns the shared data, so I don't see any way for a separate thread to have a mutable reference to that data?


use std::{borrow::Borrow, thread, time::Duration, vec};
use actix_web::{get, web, App, HttpResponse};
use actix_web::{web::Data, HttpServer};
use std::sync::Mutex;


// #[derive(Copy, Clone)]
pub struct SharedData {
    pending_indexes: Mutex<Vec<u32>>,
}



#[get("/import/{customer_id}")]
async fn import(
    web::Path(customer_id): web::Path<u32>,
    shared_data: actix_web::web::Data<SharedData>,
) -> HttpResponse {
    let mut pending_indexes = shared_data.borrow().pending_indexes.lock().unwrap();

    if !pending_indexes.contains(&customer_id) {
        pending_indexes.push(customer_id);
    }

    HttpResponse::Ok().json("Ok")
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let shared_data = Data::new(SharedData {
        pending_indexes: Mutex::new(vec![]),
    });

   // this is where it fails, as shared_data has been moved
    let handle = thread::spawn(|| loop {
        let i = &mut shared_data.pending_indexes.lock().unwrap();
        if let Some(customer_id) = i.pop() {
           // debug!("Indexing customer {}", customer_id);
        } else {
          //  debug!("Nothing to index, sleeping for 5secs");
            thread::sleep(Duration::from_secs(5));
        }
    });


    let http_server =
        HttpServer::new(move || App::new().app_data(shared_data.clone()).service(import));
    http_server.bind(format!("0.0.0.0:{}", 8080))?.run().await
}

Any help appreciated, thanks in advance!

1 Like

Data is a reference-counted pointer based on Arc, so you can clone it cheaply to get a new pointer to the same shared data:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let shared_data = Data::new(SharedData {
        pending_indexes: Mutex::new(vec![]),
    });
    
    let data = shared_data.clone(); // only copies the pointer
    let handle = thread::spawn(|| loop {
        let i = &mut data.pending_indexes.lock().unwrap();
        if let Some(customer_id) = i.pop() {
           // debug!("Indexing customer {}", customer_id);
        } else {
          //  debug!("Nothing to index, sleeping for 5secs");
            thread::sleep(Duration::from_secs(5));
        }
    });


    let http_server =
        HttpServer::new(move || App::new().app_data(shared_data.clone()).service(import));
    http_server.bind(format!("0.0.0.0:{}", 8080))?.run().await
}

Excellent, that did it: I now have this separate thread that checks, every 5 seconds, if there's some entry to process. But I've encountered something weird. Here is the thread now:

let data = shared_data.clone();
    thread::spawn(move || loop {
        let i = &mut data.pending_indexes.lock().unwrap();
        let x = i.pop();
        drop(i);

        if let Some(customer_id) = x {
            debug!("Indexing customer {}", customer_id);
        } else {
            debug!("Nothing to index, sleeping for 5secs");
            thread::sleep(Duration::from_secs(5));
        }
    });

As you can see, it locks the list, then gets an entry, and then drops the mutex guard explicitly. After droping the mutex, it sleeps for 5 seconds.
The problem is that the mutex is not released, so the web requests coming from Actix cannot aquire a lock. But if I remove the line that sleeps for 5 seconds in the standalone thread, then the Actix threads can successfully get a lock.

I was expecting that If I sleep for for those 5 seconds, there should be no lock in the pending_index Vec for that period, no?

i in that code is not a MutexGuard. It has type &mut MutexGuard. Dropping a reference has no effect, because references don't have destructors.

If you want to drop the MutexGuard explicitly, you will need to bind the guard itself to a variable:

let mut i = data.pending_indexes.lock().unwrap();
let x = i.pop();
drop(i);

Oh, silly me, thanks, I was indeed droping a refrence :slight_smile:

Thanks for the channels suggestion, I did try it at first but I coulndn't manage to compile it, but I think I now see the problem, and I should now be able to fix it.

Thanks again!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.