Async puzzle: Prepare output on multiple threads?

We have a bioinformatics library that outputs a large ndarray to a binary format. It works column by column. I can't figure out how to make it run faster by, for example, converting the next column to binary while writing the current column. Maybe this is something async can help with.

The simple, single-threaded example below is simplified but captures the gist of the problem.
I'd love to see how to make this multithreaded with something like async.
Thanks to anyone who can help with this. (I'd think it would be a common pattern.)

  • Carl
use ndarray as nd;

use std::{
    fs::File,
    io::{BufWriter, Write},
};

use thiserror::Error;

#[derive(Error, Debug)]
pub enum BedError {
    #[error("Attempt to write illegal value to BED file. Only 0,1,2,missing allowed. '{0}'")]
    BadValue(String),

    #[error(transparent)]
    IOError(#[from] std::io::Error),
}

pub fn write(
    filename: &str,
    iid_count: usize,
    sid_count: usize,
    high: f64,
) -> Result<(), BedError> {
    assert!(iid_count % 4 == 0, "iid_count must be a multiple of 4");
    let iid_count_div4 = iid_count / 4;
    let val = nd::Array::from_elem((iid_count, sid_count), high-0.01);
    
    let mut writer = BufWriter::new(File::create(filename)?);
    for column in val.axis_iter(nd::Axis(1)) {
        // Covert each column into a bytes_vector
        let mut bytes_vector: Vec<u8> = vec![0; iid_count_div4]; // inits to 0
        for (iid_i, &v0) in column.iter().enumerate() {
            let byte = if v0 < 4.0 {
                (v0 / 4.0f64).floor() as u8
            } else {
                return Err(BedError::BadValue(filename.to_string()).into());
            };
            let i_div_4 = iid_i / 4;
            let i_mod_4 = iid_i % 4;
            bytes_vector[i_div_4] |= byte << (i_mod_4 * 2);
        }
        // Write the bytes vector
        writer.write_all(&bytes_vector)?;
    }
    return Ok(());
}

#[test]
fn test1() {
    write("test1.bed", 12, 10, 4.0).unwrap();
}

#[test]
fn test2() {
    let result = write("test2.bed", 12, 10, 5.0);
    assert!(result.is_err());
}


(Playground)

Output:


running 2 tests
test test1 ... ok
test test2 ... ok

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s


running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s


Errors:

   Compiling playground v0.0.1 (/playground)
    Finished test [unoptimized + debuginfo] target(s) in 2.88s
     Running unittests (target/debug/deps/playground-942e3b5cb80c1398)
   Doc-tests playground

I am not fully aware of the scale of the problem that you're solving (like the amount of data being handled, sizes of matrices, target memory usage etc). However, there are a few observations that I would like to make:

  • You have run the tests in debug mode. Debug mode is not an accurate representation of the time that the program would take. You can run the tests in release mode by cargo test --release.
  • Async programming is not inherently multi-threaded. It can be configured to be so by the runtime that you're using. However, the real power of async shows when you have a large number of IO-bound tasks. Some good rules of the thumb are given in the Tokio Docs. From what I can see, your task is not particularly IO bounded. It is probably more compute bounded, but you can tell us about that better.
  • Assuming that you have really large matrices, I would say that your task is compute bound. Which means that you're better off spinning up several threads for calculating the byte vector of each column. Since manually scheduling onto threads is kinda a pain, you can use a thread pool, like that from Rayon.
  • On the other hand, your IO can be run on a separate thread. Probably a single thread. Now how to send the data to it? We can use a channel for that. You can use the one from crossbeam, which has higher performance than the one from std. So you send in the byte vectors from the calculation threads while it is received by the IO thread.
  • This introduces a problem however - the byte vectors arrive at the IO thread out of order. I am assuming it is important to store the columns in order in the file. There are a couple of mitigations I can think about:
    • Change the way you represent the columns - store the column number and store them out of order.
    • Change the way you store the files - store each column in a separate file, join them later while reading them in
    • Use file seeking - if you can calculate the byte offset of a column within a file from its column number , then you can seek to it. Seeking on Linux is actually quite fast, so this may not be a bad idea.
  • Additionally, you can use async on the thread that does the IO. The Tokio docs has a chapter on how to interface blocking and async code. How much of a speed gain you'll gain from this is not clear to me a priori. You could try experimenting.

@RedDocMD, thanks for your comments and insights. Here is some more info:

  • The real files on disk can sometimes be as large as 1M rows x 1M column (however that is larger than main memory), so for this writing problem say up to 1M rows x 50K columns.
  • Sadly, the real, non-simplified format is a standard (called PLINK Bed) so we must keep the columns in order.
  • For larger arrays, it is IO bound, but there is a CPU component. When it runs (with this bigger size and in release mode), it will alternate between IO and CPU, which seems inefficient.
  • My old C++ version of this tried to have two threads
    • thread 1: write the buffer from column i
    • thread 2: prepare a buffer for column i+1
      then wait for both threads to finish and increment
      (This is kind of nice because the only memory it uses is a 2nd buffer)

So now I'm wondering if Rust can do this (or something better)

[Aside: Now I wonder maybe Rust buffered output already does the right thing. BufWriter in std::io - Rust (rust-lang.org) ]

  • Carl

You can definitely do this in Rust in almost the same way you did it in C++. You won't need async for that. Just thread::spawn.

That's true. But you have a single large file to write to. Async won't help here either, since it is good when you have multiple slow IO tasks (like handling several network connections or web requests, or writing several files to a particularly slow HDD)

I would suggest you try the seek method, since I think for matrices, the columns are of same length (which means you can calculate the offset of the column in the file from it column number). Depending on your OS and machine, you can get a performance bump.

So the maximum memory you need per column is less than 8MiB. You probably want to set a bound based on this and the amount of memory you have at hand on the channel size. That way, you won't overrun the memory with too many buffers.