I am new to Rust and for a data processing task, I want to implement this:
Read a list with 500 to 25_000 zip archives
For each zip archive, extract two of the four CSV files in there
Process the two CSV files
Some of the zip archives are only a few kilobytes in size. The larger ones are 300MB.
I have the basic process working fine with Rust, and it is much faster than the Python version.
With Rayon, I could easily process all the zip archives in parallel and get a 4-6x speed up with only tiny code changes.
Now I have been looking at Tokio, I wanted to launch tasks in the background and then wait for them to finish. For reading I use the async_zip crate:
async fn process_pattern(file_path:String) -> tokio::io::Result<()>{
// ...
}
...
for l in &csv_names {
let file_path = format!("{}{}", csv_base_dir, l);
process_pattern(file_path).await?;
}
This takes a huge amount of time to run.
I have read through the documentation and found the task spawning and changed the loop
let handle = tokio::spawn(
async move { process_pattern(file_path) }
);
Now it runs super fast - but does not do anything. Even when joining all the tasks via "futures::future::join_all(handles).await;" it does not do anything. Also tried it with a JoinSet from Tokio.
The async block yields a future, which will be correctly spawned as a task and executed. But the task only drops the future returned by process_pattern().
You need to await the future instead of dropping it:
let handle = tokio::spawn(async move {
process_pattern(file_path).await
});
Or equivalently, remove the async block:
let handle = tokio::spawn(process_pattern(file_path));
Your code should get a warning about an unused Future.
But anyway, Rayon is a better choice for this task. In best-case scenario tokio would use a thread pool to read the files (just like rayon), and you should use spawn_blocking for parsing, which is another threadpool (like rayon), with the async glue code between them being only unnecessary overhead and complication.
let handle = tokio::spawn(process_pattern(file_path));
but running like that takes 3x longer on my 32 core machine than the basic single-threaded version without Tokio. That time difference is why I thought I was doing something very wrong with Tokio.
Without the .await it wasn't running process_pattern, so doing nothing could have been much faster than doing something
Rust's async fn function calls don't do any work. Nothing happens when you call an async function (this is quite the opposite from JS Promise). The Future starts paused, and doesn't make any progress until polled in .await. Without .await the Future is discarded and will usually optimize to nothing, as if nothing was ever called.
Probably what is being done wrong with tokio is that process_pattern() is not yielding to the scheduler often enough. If decompressing the archive, or deserializing the CSV, or "processing" the data (whatever that entails) takes too long, you will have a really bad time with tokio.
Reading should be non-blocking, since that is I/O. The exception is when reading and processing is already interleaved in a streaming manner (see "long-winded details" below for what to do about this scenario). If the reading occurs in one step with await, you should be good.
The one thing I can think of which might not be adequate is whether File System I/O is truly async, or if it depends on a thread pool which may be too small for your use case. tokio::fs uses the thread pool model. This comment from tokio::fs::read explains the situation:
This operation is implemented by running the equivalent blocking operation on a separate thread pool using spawn_blocking.
The size of the separate thread pool is important, and any size smaller than the total number of files you are reading will negatively impact overall throughput. As all threads will be busy waiting on I/O and a queue of files will be waiting on a thread to become free before it can begin reading.
The CPU-bound tasks and blocking code section in tokio docs offers a workaround to the problem: the TOKIO_WORKER_THREADS environment variable.
Overall, the real solution has already been provided in this comment. The rest of the discussion has just been about how you might be able to fake parallelism by increasing the granularity of cooperative concurrency with tokio.
Long-winded details on async streaming with blocking code...
It's the CPU-intensive parts that will not yield unless explicitly designed to. The decompression and deserialization steps need to yield as well. But this "async streaming" capability is not normally seen in compression or serialization crates.
To illustrate, if you are using the csv crate, then you need to be aware that it deserializes without yielding. You can insert explicit yields while iterating over rows, but there are two caveats:
It requires discipline on your part to do this. No tool that I am aware of will help automate the awareness or fixing of blocking (non-yielding) in async contexts.
Yielding more frequently than truly necessary could have more overhead than just running normal blocking code in its own dedicated thread. So, not only is this context-sensitive, but also very situational. E.g., you might want to only yield inside outer loops in some cases, but it may be advantageous to yield inside inner loops in other cases.