How to correctly process many files in parallel with tokio?

Hi there,

I am new to Rust and for a data processing task, I want to implement this:

  1. Read a list with 500 to 25_000 zip archives
  2. For each zip archive, extract two of the four CSV files in there
  3. Process the two CSV files

Some of the zip archives are only a few kilobytes in size. The larger ones are 300MB.

I have the basic process working fine with Rust, and it is much faster than the Python version.
With Rayon, I could easily process all the zip archives in parallel and get a 4-6x speed up with only tiny code changes.

Now I have been looking at Tokio, I wanted to launch tasks in the background and then wait for them to finish. For reading I use the async_zip crate:

async fn process_pattern(file_path:String) -> tokio::io::Result<()>{
 // ...
}

... 
    for l in &csv_names {
        let file_path = format!("{}{}", csv_base_dir, l);
        process_pattern(file_path).await?;
    }

This takes a huge amount of time to run.

I have read through the documentation and found the task spawning and changed the loop

           let handle = tokio::spawn( 
            async move { process_pattern(file_path) }
        );

Now it runs super fast - but does not do anything. Even when joining all the tasks via "futures::future::join_all(handles).await;" it does not do anything. Also tried it with a JoinSet from Tokio.

How do I get this to work with Tokio?

The async block yields a future, which will be correctly spawned as a task and executed. But the task only drops the future returned by process_pattern().

You need to await the future instead of dropping it:

let handle = tokio::spawn(async move {
    process_pattern(file_path).await
});

Or equivalently, remove the async block:

let handle = tokio::spawn(process_pattern(file_path));

Your code should get a warning about an unused Future.

But anyway, Rayon is a better choice for this task. In best-case scenario tokio would use a thread pool to read the files (just like rayon), and you should use spawn_blocking for parsing, which is another threadpool (like rayon), with the async glue code between them being only unnecessary overhead and complication.

3 Likes

This is what I had first

let handle = tokio::spawn(process_pattern(file_path));

but running like that takes 3x longer on my 32 core machine than the basic single-threaded version without Tokio. That time difference is why I thought I was doing something very wrong with Tokio.

Which flavour of Tokio's runtime are you running?

Without the .await it wasn't running process_pattern, so doing nothing could have been much faster than doing something :slight_smile:

Rust's async fn function calls don't do any work. Nothing happens when you call an async function (this is quite the opposite from JS Promise). The Future starts paused, and doesn't make any progress until polled in .await. Without .await the Future is discarded and will usually optimize to nothing, as if nothing was ever called.

1 Like

I am very new to Tokio:
Cargo.toml has this:

tokio = { version = "1", features = ["full"] }

and then it is this:

#[tokio::main]
async fn main() -> tokio::io::Result<()> {
... 
}

That is all I have there. From the docs, it defaults to the §Multi-Thread Scheduler, yes?

Yes, exactly. I was just checking if you were not using the single threaded runtime by chance.

Probably what is being done wrong with tokio is that process_pattern() is not yielding to the scheduler often enough. If decompressing the archive, or deserializing the CSV, or "processing" the data (whatever that entails) takes too long, you will have a really bad time with tokio.

2 Likes

The code in process_pattern() has await-statements at

  1. Reading the zip file
  2. Reading each of the two CSVs files via read_to_end() that reads the whole file in go

From your description, might reading the files in chunks be beneficial as I can add more await-statements?

The processing has no await-statements yet, as even with this bypassed, the runtime of the above two steps is very slow.

The processing is

  1. Read the first CSV to get the columns that need to be processed in the second step
  2. For each of the columns, go through all the rows to find the maximum value, then go through it again to find all values >= 80% of said maximum.
1 Like

Reading should be non-blocking, since that is I/O. The exception is when reading and processing is already interleaved in a streaming manner (see "long-winded details" below for what to do about this scenario). If the reading occurs in one step with await, you should be good.

The one thing I can think of which might not be adequate is whether File System I/O is truly async, or if it depends on a thread pool which may be too small for your use case. tokio::fs uses the thread pool model. This comment from tokio::fs::read explains the situation:

This operation is implemented by running the equivalent blocking operation on a separate thread pool using spawn_blocking.

The size of the separate thread pool is important, and any size smaller than the total number of files you are reading will negatively impact overall throughput. As all threads will be busy waiting on I/O and a queue of files will be waiting on a thread to become free before it can begin reading.

The CPU-bound tasks and blocking code section in tokio docs offers a workaround to the problem: the TOKIO_WORKER_THREADS environment variable.

Overall, the real solution has already been provided in this comment. The rest of the discussion has just been about how you might be able to fake parallelism by increasing the granularity of cooperative concurrency with tokio.

Long-winded details on async streaming with blocking code... It's the CPU-intensive parts that will not yield unless explicitly designed to. The decompression and deserialization steps need to yield as well. But this "async streaming" capability is not normally seen in compression or serialization crates.

To illustrate, if you are using the csv crate, then you need to be aware that it deserializes without yielding. You can insert explicit yields while iterating over rows, but there are two caveats:

  1. It requires discipline on your part to do this. No tool that I am aware of will help automate the awareness or fixing of blocking (non-yielding) in async contexts.
  2. Yielding more frequently than truly necessary could have more overhead than just running normal blocking code in its own dedicated thread. So, not only is this context-sensitive, but also very situational. E.g., you might want to only yield inside outer loops in some cases, but it may be advantageous to yield inside inner loops in other cases.
4 Likes

Thank you for taking the time and explaining this to me!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.