Spot the bug? :)

We have an app that does mass HTTP downloads, async. It uses a semaphore to respect rate limits (50). Yet the in-flight requests are exceeding that number.

    pub async fn go(&self, requests: Vec<Request>) -> Result<String, String> {
        let counter = Arc::new(AtomicU64::new(0));
        let in_flight = Arc::new(AtomicU64::new(0));
        let completed = Arc::new(AtomicU64::new(0));
        let semaphore = Arc::new(Semaphore::new(50));
        let tracker = TaskTracker::new();

        for request in requests {
            let semaphore = Arc::clone(&semaphore);
            let permit = semaphore.acquire_owned().await.unwrap();
            let counter = counter.clone();
            let in_flight = in_flight.clone();
            let completed = completed.clone();

            tracker.spawn(async move {
                let current_count = counter.fetch_add(1, Ordering::SeqCst) + 1;
                let in_flight_count = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
                info!(
                    "Starting request #{}; {} in flight",
                    current_count, in_flight_count
                );

                let result = run(&request).await;
                drop(permit);
                in_flight.fetch_sub(1, Ordering::SeqCst);
            });
        }

        tracker.close();
        tracker.wait().await;

        Ok("".to_owned())
    }

I don't think you can actually have more requests in flight concurrently than the limit, but you do observe the counter in_flight may exceeds 50, it's just a race condition because you release the semaphore before decreasing the counter.

6 Likes

You might be interested in streams try_buffer_unordered, which does this for you, no manual bookkeeping: TryStreamExt in futures::stream - Rust

1 Like