My codes is implemented by Stream and tokio runtimes, but I watch that only single thread work

My codes is implemented by Stream and tokio runtimes, but I watch that only single thread work.



use std::{io::BufWriter, path::PathBuf, thread};

use anyhow::Ok;
use bert::pretrain::Record;
use futures::{stream, StreamExt};
use tokio::fs::{self, remove_file, File};

// cargo run --example gen-csvs --features="pretrain"
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
    let root = &PathBuf::from("./target/gen-csvs");
    // not exist path, create it
    if !root.exists() {
        fs::create_dir_all(&root).await?;
    }

    const FILES: u64 = 100;
    const SIZE_OF_FILE: u64 = 10;
    const FLUSH_CAPACITY_OF_WRITE_ONE_FILE: u64 = 100;
    const CONCURRENT_FILES: u64 = 20;

    let gener = Box::pin(stream::iter(0..FILES))
    .flat_map_unordered(CONCURRENT_FILES as usize, |f| {
        let thread_id1 = thread::current().id();
        let thread_name1 = thread::current().name().unwrap_or("unnamed_1").to_owned();
        let ws = async_stream::stream! {
            let path = &root.join(format!("text-{}.csv", f));
            if path.exists() {
                remove_file(&path).await?;
            }

            yield {
                let csv = File::create(path).await?;
                let mut wtr = csv::Writer::from_writer(BufWriter::new(csv.into_std().await));

                let current_thread_id = thread::current().id();
                let current_thread_name = thread::current().name().unwrap_or("unnamed_1").to_owned();
                println!("write file: {}, current thread = {current_thread_id:?}/{current_thread_name} < {thread_id1:?}/{thread_name1}", path.display());

                for i in 0..SIZE_OF_FILE {
                    let s = "xxxxxx";
                    wtr.serialize(Record::new(Some(f * SIZE_OF_FILE + i), s.to_owned(), 1))?;
                    if i % FLUSH_CAPACITY_OF_WRITE_ONE_FILE == 0 {
                        wtr.flush()?;
                    }
                }
                wtr.flush()?;
                Ok(())
            };
        };
        Box::pin(ws)
    })
    .for_each_concurrent(CONCURRENT_FILES as usize, |r| async move{
        // let current_thread_id = thread::current().id();
        // let current_thread_name = thread::current().name().unwrap_or("unnamed_1").to_owned();
        // println!("for_each_concurrent:, current thread = {current_thread_id:?}/{current_thread_name}");

        match r {
            Result::Ok(_) => {},
            Err(e) => {
                println!("error: {:?}", e);
            }
        }
    });

    gener.await;

    Ok(())
}

I don't see you ever spawning a task anywhere. If you don't spawn tasks, tokio will never use any of the worker threads.

Also note that there is a footgun lurking in methods like StreamExt::for_each_concurrent.

3 Likes

This isn't a performance footgun - it's a deadlock footgun where you have code that looks reasonable, but depending on runtime conditions can deadlock completely.

1 Like

Huh, I previously read your issue solely as a problem with unexpected long polls of the sub-executor, not as an issue with deadlocks. Completely ignored your second point concerning the problem with async semaphores and hence types like tokio's Mutex that use them under the hood. I amended my answer.

1 Like

That's what makes it a nasty footgun; on first glance, it's "just" a performance issue. On second glance, you see the timeout problems, where things fail on a lightly loaded system. And on third look, you see the deadlock issue.

1 Like

This code is blocking the thread by using non-async file operations.

I recommend writing your csv to an in-memory array and writing the entire file using tokio::fs::write. See the module documentation for tokio::fs for more suggestions on file io.

Alternatively, you can use one of the async csv libraries.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.