Best way to handle groups of parallel async tasks?

I am building a tool that downloads a bunch of files from our backend server in parallel. My first naive attempt resulted in "Too many open files" (macos has a default limit of 256, which I don't want to exceed).

Then I asked this question Async: Best way to download many files without overloading the client and/or the server? (native & web/wasm) - #6 by bes which helped me.

Now I have a similar problem in which I have a buffer of tasks in FuturesUnordered (let's call this A) that are executing in parallel, and each such task itself has a FuturesUnordered (this is B) that is running in parallel (bounded to n=32).

So A is a group of high-level tasks, e.g. resource group 1, 2, 3 etc. And B is the files to be downloaded in that resource group

Root
 └A1┐
    B1-download 1, 2, 3
    B2-...
    B3-...
 └A2┐
    B4-...
    B5-...
    ...

My question is - does there exist a utility type / library to manage a global maximum of concurrent futures over all sub-FuturesUnordered?

My current idea is to send around a tokio::sync::Semaphore which each download tasks acquires and .awaits. I think this approach will solve the "Too many open files" problem, but will it cause other problems instead?

Something like this:

fn group_b(
    &self
    // Wraps tokio::sync::Semaphore
    permit_fetcher: &dyn PermitFetcher,
) -> Result</**/> {
    let mut group = FuturesUnordered::new();

    for /* some loop */ {
        // Create a download
        group.push(async move {
            let _permit = permit_fetcher.acquire().await?;
            // Perform http call using reqwest
            let data = self.api_call(/**/);
            // The rest is not interesting
            ...
        })
    }

    while let Some(data_result) = group.next().await {
        let _ = data_result?;
    }
}

Use StreamExt::buffer_unordered().

impl Op {
    async fn download(&self) -> Resp {...}
}

async fn download_all(ops: &[Op], max_in_flight: usize) -> Vec<Resp> {
    futures::stream::iter(ops)
        .map(|op| async move { op.download().await })
        .buffer_unordered(max_in_flight)
        .collect()
        .await
}

Thank you, but that isn't a solution for me. Each of my B groups are independently fallible, and each group collects some metadata from the for loop that needs to be returned.

What I think I want is a "global" kind of buffer_unordered that can "detach" sub-groups that are independent from the main buffer. The main buffer just maintains a global maximum of concurrent tasks.

I think my Semaphore variant achieves that, but in a sub-optimal way, perhaps?