I am playing with the futures, tokio runtimes.
I try to map stream of tokio::fs to have some file walker implementation.
In process I encountered problem: I fail no matter what to properly pass the async function or closure and satisfy lifetime bounds for futures::stream::StreamExt::filter operator function.
How I should properly annotate the following code, for it to compile?
use futures::stream::StreamExt;
use std::io::Error;
use std::vec::Vec;
use tokio::fs::{read_dir, DirEntry};
use tokio::runtime::Runtime;
use std::path::Path;
async fn is_dir(dir: &DirEntry) -> bool {
let ftype = dir.file_type().await.unwrap();
ftype.is_dir()
}
async fn folder_scan(path: impl AsRef<Path>) -> Result<(), Error> {
let directory_stream = read_dir(path).await?;
let directories_in_folder: Vec<_> = directory_stream
.filter_map(|dir| async move {
if let Ok(normal) = dir {
Some(normal)
} else {
None
}
})
.filter(is_dir)
.collect()
.await;
Ok(())
}
fn main() {
let something = Runtime::new()
.expect("Failed to create Tokio runtime")
.block_on(folder_scan("/Users/tom/Downloads"));
}
I am sure I am missing something trivial, but I cannot get what compiler expects from me.
This is one of the things that the unstable async closure feature will eventually fix. The issue is that filter is only given a reference to the item, but the type system does not allow the async block to store that reference.
It's quite unfortunate really, but you would have to take ownership of the path, and move an owned path into the async block: