I have an actix-web server that basically does an ETL process. I can invoke a URL to tell the system to start the ETL process for a given customer_id. The ETL process is costly, and I don't want to process the same customer multiple times in a row, so I have list of customer_ids pending to process, so:
An actix-web handler listening to the URL /import/{customer_id} will keep on adding entries to the Vec if they are not already there.
A separate worker thread should keep looping over the list, and process all entities in the list. If the list is empty, well, I'm happy for now if it just sleeps for 5 seconds and starts over.
The problem is that I cannot find a way to share the list between the worker thread and Actix shared data: Actix owns the shared data, so I don't see any way for a separate thread to have a mutable reference to that data?
use std::{borrow::Borrow, thread, time::Duration, vec};
use actix_web::{get, web, App, HttpResponse};
use actix_web::{web::Data, HttpServer};
use std::sync::Mutex;
// #[derive(Copy, Clone)]
pub struct SharedData {
pending_indexes: Mutex<Vec<u32>>,
}
#[get("/import/{customer_id}")]
async fn import(
web::Path(customer_id): web::Path<u32>,
shared_data: actix_web::web::Data<SharedData>,
) -> HttpResponse {
let mut pending_indexes = shared_data.borrow().pending_indexes.lock().unwrap();
if !pending_indexes.contains(&customer_id) {
pending_indexes.push(customer_id);
}
HttpResponse::Ok().json("Ok")
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let shared_data = Data::new(SharedData {
pending_indexes: Mutex::new(vec![]),
});
// this is where it fails, as shared_data has been moved
let handle = thread::spawn(|| loop {
let i = &mut shared_data.pending_indexes.lock().unwrap();
if let Some(customer_id) = i.pop() {
// debug!("Indexing customer {}", customer_id);
} else {
// debug!("Nothing to index, sleeping for 5secs");
thread::sleep(Duration::from_secs(5));
}
});
let http_server =
HttpServer::new(move || App::new().app_data(shared_data.clone()).service(import));
http_server.bind(format!("0.0.0.0:{}", 8080))?.run().await
}
Excellent, that did it: I now have this separate thread that checks, every 5 seconds, if there's some entry to process. But I've encountered something weird. Here is the thread now:
let data = shared_data.clone();
thread::spawn(move || loop {
let i = &mut data.pending_indexes.lock().unwrap();
let x = i.pop();
drop(i);
if let Some(customer_id) = x {
debug!("Indexing customer {}", customer_id);
} else {
debug!("Nothing to index, sleeping for 5secs");
thread::sleep(Duration::from_secs(5));
}
});
As you can see, it locks the list, then gets an entry, and then drops the mutex guard explicitly. After droping the mutex, it sleeps for 5 seconds.
The problem is that the mutex is not released, so the web requests coming from Actix cannot aquire a lock. But if I remove the line that sleeps for 5 seconds in the standalone thread, then the Actix threads can successfully get a lock.
I was expecting that If I sleep for for those 5 seconds, there should be no lock in the pending_index Vec for that period, no?
Oh, silly me, thanks, I was indeed droping a refrence
Thanks for the channels suggestion, I did try it at first but I coulndn't manage to compile it, but I think I now see the problem, and I should now be able to fix it.