We have an app that does mass HTTP downloads, async. It uses a semaphore to respect rate limits (50). Yet the in-flight requests are exceeding that number.
pub async fn go(&self, requests: Vec<Request>) -> Result<String, String> {
let counter = Arc::new(AtomicU64::new(0));
let in_flight = Arc::new(AtomicU64::new(0));
let completed = Arc::new(AtomicU64::new(0));
let semaphore = Arc::new(Semaphore::new(50));
let tracker = TaskTracker::new();
for request in requests {
let semaphore = Arc::clone(&semaphore);
let permit = semaphore.acquire_owned().await.unwrap();
let counter = counter.clone();
let in_flight = in_flight.clone();
let completed = completed.clone();
tracker.spawn(async move {
let current_count = counter.fetch_add(1, Ordering::SeqCst) + 1;
let in_flight_count = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
info!(
"Starting request #{}; {} in flight",
current_count, in_flight_count
);
let result = run(&request).await;
drop(permit);
in_flight.fetch_sub(1, Ordering::SeqCst);
});
}
tracker.close();
tracker.wait().await;
Ok("".to_owned())
}