Tokio semaphore mystery: acquire vs aquire_owned

I can not understand what is going on here.
I limit number of file_digest via semaphore,
but I got error "Too many open files".

#[tokio::main]
async fn main() -> Result<(), Error> {
    let aip_path =
        Path::new("some path with many files");
    let mut paths = Vec::new();
    for entry in WalkDir::new(aip_path) {
        let entry = entry?;
        if entry.file_type().is_file() {
            paths.push(entry.path().to_path_buf());
        }
    }
    let mut tasks = Vec::new();
    let limit_io_semaphore = Arc::new(TokioSemaphore::new(30));
    for p in paths {
        let limit_io_semaphore = limit_io_semaphore.clone();
        tasks.push(tokio::spawn(async move {
            let _ = limit_io_semaphore.acquire().await.unwrap();
            file_digest(&p).await.unwrap();
        }));
    }
    let existings_files = join_all(tasks).await;
    Ok(())
}

but if change code like this:

    let limit_io_semaphore = Arc::new(TokioSemaphore::new(30));
    for p in paths {
        let limit_io_semaphore = limit_io_semaphore.clone();
        let permit = limit_io_semaphore.acquire_owned().await.unwrap();
        tasks.push(tokio::spawn(async move {
            file_digest(&p).await.unwrap();
            drop(permit);
        }));
    }

all works just fine.

But why? In both cases, there must be the same number of files open at the same time - 30.
Why the first variant cause "Too many open files" error?

Just in case here full code:

use futures_util::future::join_all;
use sha1::{Digest, Sha1};
use std::path::Path;
use std::sync::Arc;
use tokio::fs::File as TokioFile;
use tokio::io::AsyncReadExt;
use tokio::sync::Semaphore as TokioSemaphore;
use walkdir::WalkDir;

type Error = Box<dyn std::error::Error>;

#[tokio::main]
async fn main() -> Result<(), Error> {
    let aip_path =
        Path::new("dir with many files");
    let mut paths = Vec::new();
    for entry in WalkDir::new(aip_path) {
        let entry = entry?;
        if entry.file_type().is_file() {
            paths.push(entry.path().to_path_buf());
        }
    }
    let mut tasks = Vec::new();
    let limit_io_semaphore = Arc::new(TokioSemaphore::new(30));
    for p in paths {
        let limit_io_semaphore = limit_io_semaphore.clone();
        tasks.push(tokio::spawn(async move {
            let _ = limit_io_semaphore.acquire().await.unwrap();
            file_digest(&p).await.unwrap();
        }));
    }
    let existings_files = join_all(tasks).await;
    Ok(())
}

async fn file_digest(path: &Path) -> Result<Sha1, Error> {
    let mut file = TokioFile::open(path).await?;
    let mut hasher = Sha1::new();
    let mut buf = vec![0_u8; 1024 * 1024];
    loop {
        let n = file.read(&mut buf).await?;
        if n == 0 {
            break;
        }
        hasher.update(&buf[0..n]);
    }
    Ok(hasher)
}

let _ = ... immediately drops the value on the right hand of the statement. The reason acquire_owned works is you're correctly holding on to the permit in that version.

Change it to let _permit =... and it should work. Here the _ at the beginning of the variable name just means "this isn't going to be used so don't warn about it being unused, but it DOES need to stick around until the variable would normally be dropped" vs. just an underscore which means "I don't need this value at all"

9 Likes

I see. This is looks for me as really bad language design.
I never thought that _ and _var has different semantic.

It's definitely a gotcha. You can read more about it here and in the 2013 bug it links to (where the decision was made).

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.