I have an example code list files in directory, it is using tokio.
I thought the three threads (list_directory
and process_listed
main
) running in parallel.
But the output show main
thread is not running as I thought, the list_directory
is running too frequently than other threads.
Why this happen? Thanks
use std::path::PathBuf;
use tokio::{fs::read_dir, spawn, sync::mpsc::*};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (send_m, mut recv_m) = channel(512);
let (send_l, recv_l) = channel(512);
let (send_p, recv_p) = channel(512);
let path = "/usr/share/perl/5.30/unicore/";
spawn(async move {
list_directory(recv_l, send_p).await.unwrap();
});
spawn(async move {
process_listed(recv_p, send_l, send_m, vec![PathBuf::from(path)])
.await
.unwrap();
});
while let Some(Some(path)) = recv_m.recv().await {
eprintln!("get task:: {:?}", path);
// process path, such as copy to somewhere
tokio::time::sleep(std::time::Duration::from_micros(100)).await;
}
Ok(())
}
pub async fn list_directory(
mut dir: Receiver<Option<PathBuf>>,
send: Sender<Option<PathBuf>>,
) -> Result<(), Box<dyn std::error::Error>> {
while let Some(Some(path)) = dir.recv().await {
eprintln!("list directory:: rece {:?} ...", path);
if path.is_dir() {
let mut read = read_dir(path).await?;
while let Some(item) = read.next_entry().await? {
eprintln!("list directory:: send item {:?} ...", item);
send.send(Some(item.path())).await?;
}
} else {
send.send(Some(PathBuf::from(path))).await?;
}
send.send(None).await?;
}
eprintln!("list directory:: ending ...");
Ok(())
}
pub async fn process_listed(
mut path: Receiver<Option<PathBuf>>,
list_send: Sender<Option<PathBuf>>,
task_send: Sender<Option<PathBuf>>,
paths: Vec<PathBuf>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut count = paths.len();
for path in paths {
list_send.send(Some(path)).await?;
}
while count > 0 {
while let Some(path) = path.recv().await {
match path {
Some(path) => {
if path.is_dir() {
eprintln!("process task:: send to list directory {:?}", path);
list_send.send(Some(path)).await?;
count += 1;
eprintln!("........ count = {count}");
} else {
eprintln!("process task:: send to main thread {:?}", path);
task_send.send(Some(path)).await?;
}
}
None => {
count -= 1;
eprintln!("........ count = {count}");
if count == 0 {
break;
}
}
}
}
}
eprintln!("process task:: sending None to main thread");
list_send.send(None).await?;
task_send.send(None).await?;
Ok(())
}