I am building a tool that downloads a bunch of files from our backend server in parallel. My first naive attempt resulted in "Too many open files" (macos has a default limit of 256, which I don't want to exceed).
Then I asked this question Async: Best way to download many files without overloading the client and/or the server? (native & web/wasm) - #6 by bes which helped me.
Now I have a similar problem in which I have a buffer of tasks in FuturesUnordered
(let's call this A
) that are executing in parallel, and each such task itself has a FuturesUnordered
(this is B
) that is running in parallel (bounded to n=32).
So A
is a group of high-level tasks, e.g. resource group 1, 2, 3 etc. And B
is the files to be downloaded in that resource group
Root
└A1┐
B1-download 1, 2, 3
B2-...
B3-...
└A2┐
B4-...
B5-...
...
My question is - does there exist a utility type / library to manage a global maximum of concurrent futures over all sub-FuturesUnordered
?
My current idea is to send around a tokio::sync::Semaphore
which each download
tasks acquire
s and .await
s. I think this approach will solve the "Too many open files" problem, but will it cause other problems instead?
Something like this:
fn group_b(
&self
// Wraps tokio::sync::Semaphore
permit_fetcher: &dyn PermitFetcher,
) -> Result</**/> {
let mut group = FuturesUnordered::new();
for /* some loop */ {
// Create a download
group.push(async move {
let _permit = permit_fetcher.acquire().await?;
// Perform http call using reqwest
let data = self.api_call(/**/);
// The rest is not interesting
...
})
}
while let Some(data_result) = group.next().await {
let _ = data_result?;
}
}