Trouble using tokio MPSC channel

I have an example code list files in directory, it is using tokio.

playground

I thought the three threads (list_directory and process_listed main) running in parallel.

But the output show main thread is not running as I thought, the list_directory is running too frequently than other threads.

Why this happen? Thanks

use std::path::PathBuf;

use tokio::{fs::read_dir, spawn, sync::mpsc::*};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (send_m, mut recv_m) = channel(512);
    let (send_l, recv_l) = channel(512);
    let (send_p, recv_p) = channel(512);
    let path = "/usr/share/perl/5.30/unicore/";

    spawn(async move {
        list_directory(recv_l, send_p).await.unwrap();
    });
    spawn(async move {
        process_listed(recv_p, send_l, send_m, vec![PathBuf::from(path)])
            .await
            .unwrap();
    });
    while let Some(Some(path)) = recv_m.recv().await {
        eprintln!("get task:: {:?}", path);
        // process path, such as copy to somewhere
        tokio::time::sleep(std::time::Duration::from_micros(100)).await;
    }
    Ok(())
}

pub async fn list_directory(
    mut dir: Receiver<Option<PathBuf>>,
    send: Sender<Option<PathBuf>>,
) -> Result<(), Box<dyn std::error::Error>> {
    while let Some(Some(path)) = dir.recv().await {
        eprintln!("list directory:: rece {:?} ...", path);
        if path.is_dir() {
            let mut read = read_dir(path).await?;

            while let Some(item) = read.next_entry().await? {
                eprintln!("list directory:: send item {:?} ...", item);
                send.send(Some(item.path())).await?;
            }
        } else {
            send.send(Some(PathBuf::from(path))).await?;
        }
        send.send(None).await?;
    }
    eprintln!("list directory:: ending ...");
    Ok(())
}

pub async fn process_listed(
    mut path: Receiver<Option<PathBuf>>,
    list_send: Sender<Option<PathBuf>>,
    task_send: Sender<Option<PathBuf>>,
    paths: Vec<PathBuf>,
) -> Result<(), Box<dyn std::error::Error>> {
    let mut count = paths.len();

    for path in paths {
        list_send.send(Some(path)).await?;
    }
    while count > 0 {
        while let Some(path) = path.recv().await {
            match path {
                Some(path) => {
                    if path.is_dir() {
                        eprintln!("process task:: send to list directory {:?}", path);
                        list_send.send(Some(path)).await?;
                        count += 1;
                        eprintln!("........ count = {count}");
                    } else {
                        eprintln!("process task:: send to main thread {:?}", path);
                        task_send.send(Some(path)).await?;
                    }
                }
                None => {
                    count -= 1;
                    eprintln!("........ count = {count}");
                    if count == 0 {
                        break;
                    }
                }
            }
        }
    }
    eprintln!("process task:: sending None to main thread");
    list_send.send(None).await?;
    task_send.send(None).await?;
    Ok(())
}

Your channel buffer is rather large, and the other two task aren't doing much "real" work. As a result, they will complete their work really fast, throwing all results into the channel, and leave you waiting on the main task.

So you should either:

  1. Shrink your channel buffer, which makes your other threads wait for the main thread. This doesn't really speed up anything, as your bottleneck is still there.

  2. spawn your tasks received in main thread. This will make your "main" tasks run concurrently so it will most likely speed up things.

1 Like

Thanks for the reply.

  1. I changed the channel buffer size to 5, the code is stuck. And I noticed the main thread only received 3 items from recv_m. Why?

  2. Not see much difference when I spawn the logical in another thread.

The issue seems like solved, the main thread is differently, not treat as a worker. Runtime in tokio::runtime - Rust

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.