I can not understand what is going on here.
I limit number of file_digest via semaphore,
but I got error "Too many open files".
#[tokio::main]
async fn main() -> Result<(), Error> {
let aip_path =
Path::new("some path with many files");
let mut paths = Vec::new();
for entry in WalkDir::new(aip_path) {
let entry = entry?;
if entry.file_type().is_file() {
paths.push(entry.path().to_path_buf());
}
}
let mut tasks = Vec::new();
let limit_io_semaphore = Arc::new(TokioSemaphore::new(30));
for p in paths {
let limit_io_semaphore = limit_io_semaphore.clone();
tasks.push(tokio::spawn(async move {
let _ = limit_io_semaphore.acquire().await.unwrap();
file_digest(&p).await.unwrap();
}));
}
let existings_files = join_all(tasks).await;
Ok(())
}
but if change code like this:
let limit_io_semaphore = Arc::new(TokioSemaphore::new(30));
for p in paths {
let limit_io_semaphore = limit_io_semaphore.clone();
let permit = limit_io_semaphore.acquire_owned().await.unwrap();
tasks.push(tokio::spawn(async move {
file_digest(&p).await.unwrap();
drop(permit);
}));
}
all works just fine.
But why? In both cases, there must be the same number of files open at the same time - 30.
Why the first variant cause "Too many open files" error?
use futures_util::future::join_all;
use sha1::{Digest, Sha1};
use std::path::Path;
use std::sync::Arc;
use tokio::fs::File as TokioFile;
use tokio::io::AsyncReadExt;
use tokio::sync::Semaphore as TokioSemaphore;
use walkdir::WalkDir;
type Error = Box<dyn std::error::Error>;
#[tokio::main]
async fn main() -> Result<(), Error> {
let aip_path =
Path::new("dir with many files");
let mut paths = Vec::new();
for entry in WalkDir::new(aip_path) {
let entry = entry?;
if entry.file_type().is_file() {
paths.push(entry.path().to_path_buf());
}
}
let mut tasks = Vec::new();
let limit_io_semaphore = Arc::new(TokioSemaphore::new(30));
for p in paths {
let limit_io_semaphore = limit_io_semaphore.clone();
tasks.push(tokio::spawn(async move {
let _ = limit_io_semaphore.acquire().await.unwrap();
file_digest(&p).await.unwrap();
}));
}
let existings_files = join_all(tasks).await;
Ok(())
}
async fn file_digest(path: &Path) -> Result<Sha1, Error> {
let mut file = TokioFile::open(path).await?;
let mut hasher = Sha1::new();
let mut buf = vec![0_u8; 1024 * 1024];
loop {
let n = file.read(&mut buf).await?;
if n == 0 {
break;
}
hasher.update(&buf[0..n]);
}
Ok(hasher)
}
let _ = ... immediately drops the value on the right hand of the statement. The reason acquire_owned works is you're correctly holding on to the permit in that version.
Change it to let _permit =... and it should work. Here the _ at the beginning of the variable name just means "this isn't going to be used so don't warn about it being unused, but it DOES need to stick around until the variable would normally be dropped" vs. just an underscore which means "I don't need this value at all"