How to filter an async stream

What's the best way to filter an async stream? The types are kind of confusing to me.
example: read_dir

I have a predicate that I want to apply to the stream like this:

async fn is_match(p: impl AsRef<Path>) -> bool

then I want to collect the results into a Vec<PathBuf>
So the function that I would want to write is this:

async fn read_dir_filtered<P: AsRef<Path>>(path: P) -> Result<Vec<PathBuf>>

which in turn needs to uses is_match.
Ideas for this?

If you just need to create a vector, you can adapt the example on the read_dir function:

use async_std::fs;
use async_std::prelude::*;

let mut entries = fs::read_dir(".").await?;

while let Some(res) = entries.next().await {
    let entry = res?;
    println!("{}", entry.file_name().to_string_lossy());
}

Just add an if here to skip the ones you don't want.

1 Like

Thanks for your reply @alice! Your suggestion is valid.
for completeness: I tried to get it working with filter_map and this is what now seems to work for me:

use anyhow::{Context, Result};
use async_std::{
    fs,
    path::{Path, PathBuf},
};
use futures::stream::{Stream, StreamExt};
use std::ffi::OsStr;

async fn read_dir_filtered<P: AsRef<Path>>(
    ending: &'static str,
    path: P,
) -> Result<std::pin::Pin<Box<dyn Stream<Item = PathBuf>>>> {
    let dir_stream = fs::read_dir(&path)
        .await
        .with_context(|| format!("failed to read: {:?}", path.as_ref()))?;
    Ok(Box::pin(dir_stream.filter_map(move |d| async move {
        match d {
            Ok(rr) if path_has_ending(rr.path(), ending) => Some(rr.path().to_path_buf()),
            Ok(_) | Err(_) => None,
        }
    })))
}

fn path_has_ending(p: impl AsRef<Path>, ending: &str) -> bool {
    match p.as_ref().extension().and_then(OsStr::to_str) {
        Some(ext) => ending == ext,
        _ => false,
    }
}