Passing async function to filter() for futures::stream Stream

I am playing with the futures, tokio runtimes.
I try to map stream of tokio::fs to have some file walker implementation.
In process I encountered problem: I fail no matter what to properly pass the async function or closure and satisfy lifetime bounds for futures::stream::StreamExt::filter operator function.

How I should properly annotate the following code, for it to compile?

use futures::stream::StreamExt;
use std::io::Error;
use std::vec::Vec;
use tokio::fs::{read_dir, DirEntry};
use tokio::runtime::Runtime;
use std::path::Path;

async fn is_dir(dir: &DirEntry) -> bool {
    let ftype = dir.file_type().await.unwrap();
    ftype.is_dir()
}

async fn folder_scan(path: impl AsRef<Path>) -> Result<(), Error> {
    let directory_stream = read_dir(path).await?;
    let directories_in_folder: Vec<_> = directory_stream
        .filter_map(|dir| async move {
            if let Ok(normal) = dir {
                Some(normal)
            } else {
                None
            }
        })
        .filter(is_dir)
        .collect()
        .await;
    Ok(())
}

fn main() {
    let something = Runtime::new()
        .expect("Failed to create Tokio runtime")
        .block_on(folder_scan("/Users/tom/Downloads"));
}

I am sure I am missing something trivial, but I cannot get what compiler expects from me.

3 Likes

Compiler output just hints lifetime problem:
error[E0271]: type mismatch resolving for<'r> <for<'_> fn(&tokio::fs::read_dir::DirEntry) -> impl core::future::future::Future {is_dir} as std::ops::FnOnce<(&'r tokio::fs::read_dir::DirEntry,)>>::Output == _
--> src/main.rs:23:10
|
23 | .filter(is_dir)
| ^^^^^^ expected bound lifetime parameter, found concrete lifetime

In this case, you can move your filter logic inside the async block for filter_map(), which simplifies things a bit. playground

This is one of the things that the unstable async closure feature will eventually fix. The issue is that filter is only given a reference to the item, but the type system does not allow the async block to store that reference.

It's quite unfortunate really, but you would have to take ownership of the path, and move an owned path into the async block:

.filter(|d| {
    let path = d.path();
    async move { is_dir(path) }
})

(or you can do it in filter_map, which has no issues as filter_map is given ownership of the item)

3 Likes

It does make sense. When async functions gets triggered reference might point to value long gone.

I ended up with something like this:

let directories_in_folder: Vec<_> = directory_stream
        .filter_map(|dir| async move {
            dir.ok()
        })
        .filter_map(|dir| async move {
            if is_dir(&dir).await {
                Some(dir)
            } else {
                None
            }
        })
        .collect()
        .await;

Well yes, you have to code it so the compiler can verify that the reference is not long gone, and the type system can't do that check right now.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.