Async closures are unstable

I am completely new to Rust, coming from mostly JS, C# and C++. I am trying to create a small search engine for my source code using Meilisearch.

For this purpose, I am trying to use the visit_dirs function defined in the read_dir example.

Now, the Meilisearch sdk has an async function called add_documents that has this signature:

pub async fn add_documents<T: Document>(
&'a self,
documents: &[T],
primary_key: Option<&str>,
) -> Result<Progress<'a>, Error>

I would like to call this from visit_dirs to index all the files found:

  let r = visit_dirs::visit_dirs(&base_path, &gitignore_file, &|dir_entry| {
    let contents =
      read_to_string(dir_entry.path()).expect("Something went wrong reading the file");

    indexed_files.add_documents(
      &[IndexedFile { contents }],
      Some("path"),
    ); // <-- I can't await here
  });

However, the closure is synchronous and if I try to mark it async, I am told that they are not stable. I tried adding #![feature(async_closure)] but I am then told that I can't do that on the stable release channel. Alright, I am willing to play with unstable but before I go there I want to make sure that I am not overlooking something simple.

What would be an approach that works on stable? Though I might want something fancier at some point, I would be happy to just index files serially for now. I don't think I want to create an array of futures that I return as that could contain many files and thus would explode memory consumption.

Since you are using the blocking function visit_dirs, which is not allowed in async code, I assume you don't already have a runtime. In that case you can do:

let mut rt = tokio::runtime::Builder::new()
    .basic_scheduler()
    .enable_all()
    .build()
    .unwrap();

let r = visit_dirs::visit_dirs(&base_path, &gitignore_file, &|dir_entry| {
    let contents =
        read_to_string(dir_entry.path()).expect("Something went wrong reading the file");

    rt.block_on(async {
        indexed_files.add_documents(&[IndexedFile { contents }], Some("path")).await;
    });
});

If my assumption is wrong, then this will emit an error about you trying to start a runtime inside a runtime. In that case, you need another approach.

As for async closures, the stable alternative is || async { ... }, but that wont work in your case as walkdir is not really compatible with async/await.

Thank you for responding!

I do have a Tokio runtime in the outer scope (though not available as a variable that I know of -- I've just marked it #[tokio::main]), if that changes things?

BTW: disk I/O doesn't have truly async implementations in Rust. "async" disk I/O will spawn threads and use blocking I/O under the hood, and you're just paying overhead of making it look async.

Use async only for network I/O, and classic threads+blocking for disk I/O. If you need to mix the two, tokio runtime has ways of spawning blocking tasks for such things.

1 Like

Okay, interesting! I thought that was how any async implementation worked under the hood (well that or hardware interrupts, I guess)?

Anyway, thanks but as this is the SDK that Meilisearch offers I have no other option for now.
This is the function that I am trying to make work:

async fn watcher_main(client: &Client<'_>) {
  let indexed_files = client.get_or_create("indexed_files").await.unwrap();

  let path = env::current_dir().unwrap();
  let base_path = path.parent().unwrap();

  let gitignore_path = base_path.join(".gitignore");
  let (gitignore_file, ..) = Gitignore::new(&gitignore_path);

  let r = visit_dirs::visit_dirs(&base_path, &gitignore_file, &|dir_entry| {
    let ignored = gitignore_file.matched(&dir_entry.path(), false);
    if ignored.is_none() {
      let contents =
        read_to_string(dir_entry.path()).expect("Something went wrong reading the file");

      indexed_files.add_documents(
        &[IndexedFile {
          path: dir_entry.path().into_os_string().into_string().unwrap(),
          contents,
        }],
        Some("path"),
      ); // <- problem :-(
    }
  });

As you can see, it's async, being called from a main function that is marked with tokio::main so there is a runtime already. I can't figure out how to get a reference to that, which I guess would let me do what Alice suggests.

I see you posted on the reddit. Perhaps what you want is something like this:

async fn watcher_main(client: &Client<'_>) {
    let indexed_files = client.get_or_create("indexed_files").await.unwrap();

    let path = env::current_dir().unwrap();
    let base_path = path.parent().unwrap();

    let gitignore_path = base_path.join(".gitignore");
    let (gitignore_file, ..) = Gitignore::new(&gitignore_path);

    // use tokio::runtime::Handle;
    let handle = Handle::current();
    tokio::task::spawn_blocking(move || {
        visit_dirs::visit_dirs(&base_path, &gitignore_file, &|dir_entry| {
            let ignored = gitignore_file.matched(&dir_entry.path(), false);
            if ignored.is_none() {
                let contents =
                    read_to_string(dir_entry.path()).expect("Something went wrong reading the file");
    
                handle.block_on(indexed_files.add_documents(
                    &[IndexedFile {
                        path: dir_entry.path().into_os_string().into_string().unwrap(),
                        contents,
                    }],
                    Some("path"),
                ));
            }
        });
    }).await.unwrap();
}

Alternatively something like this might be better:

async fn watcher_main(client: &Client<'_>) {
    let indexed_files = client.get_or_create("indexed_files").await.unwrap();

    let path = env::current_dir().unwrap();
    let base_path = path.parent().unwrap();

    let gitignore_path = base_path.join(".gitignore");
    let (gitignore_file, ..) = Gitignore::new(&gitignore_path);
    
    // use tokio::sync::mpsc;
    let (send, recv) = mpsc::unbounded_channel();

    let join = tokio::task::spawn_blocking(move || {
        visit_dirs::visit_dirs(&base_path, &gitignore_file, &|dir_entry| {
            let ignored = gitignore_file.matched(&dir_entry.path(), false);
            if ignored.is_none() {
                let contents = read_to_string(dir_entry.path())
                    .expect("Something went wrong reading the file");
    
                send.send(IndexedFile {
                    path: dir_entry.path().into_os_string().into_string().unwrap(),
                    contents,
                });
            }
        });
    });
    
    while let Some(indexed) = recv.recv().await {
        indexed_files.add_documents(&[indexed], Some("path")).await?;
    }
    
    // Wait for the task to shut down.
    join.await.unwrap();
}

There may be some ownership problems with these since spawn_blocking cannot use non-owned values. If so, and if you can't fix it yourself, please post below.

Thank you Alice, I hope posting on Reddit didn't come off as passive-aggressive, I am grateful for your help!
I'll try and see if I can get further with your suggestions :slight_smile: