Suitable pattern to coalesce async streams into buffered queue?

Dear Rustaceans,

I have just started to teach myself Rust and am at the stage to slowly transition from the Rust book examples to actually writing my first full program, but I seem to have bitten off more than I can chew with the task I need to solve. :grimacing:

Therefore, I would kindly ask for some guidance on a suitable architectural pattern and helpful crates for the following task: :pray:

I would like to process a bioinformatic file format called FastQ, essentially plain text files containing millions of separate records to iterate over. Thanks to the noodles crate, asynchronously iterating over the records of a single file is a breeze and several synchronous implementations are available, too. However, FastQ files may come in pairs, where each record in File A has a matching record in File B, and both share the same ID.

So I need to read the records from both files in such a way, that I can process the information jointly in subsequent steps. In most cases, the files are ordered, so that the respective nth record in each file form the nth pair, but I can't bank on this. Therefore, I need to have a small buffer for say 1000 records, where new records from both files are added and once a pair is completed, the information gets injected into another stream and popped from the buffer. It would actually be desirable for the program to exit should the buffer fill up too much (e.g. because by human error files that don't belong together have been submitted and no pairs can be formed).

Could someone kindly suggest crates and names of functions I should familiarize myself with to solve this problem? To put it mildly, I am overwhelmed by the documentation for those concurrency crates. The Tokio / Futures async approach seems very tempting, because the is no CPU-heavy work involved that would justify using real threads with e.g. rayon, but the async patterns are even harder to wrap my head around. So I am also e.g. open to construct a crossbeam queue synchronously, that rayon threads then consume. Ultimately, I am open to anything, just overwhelmed by the many options, so suggestions from somebody with experience in similar tasks would be much welcome!

Thanks a lot for reading and have a nice day everybody!
Matthias

To solve this, you should be looking at ordinary non-async data structures. For example, associate each file with a hash map, and each time you read an item from the file, you first check if there's a match in the other map, otherwise put it in the file's own map. If you alternate reading from the two files, then this should give you what you're after.

Using the above pattern inside async is perfectly fine — it's not too CPU intensive to put inside Tokio.

2 Likes

Haven't we all, constantly :slightly_smiling_face:

I can't judge your Rust knowledge from your text, but my first intuition would be to break your problem down into smaller problems and step by step add more functionality, speed, whatever.

I'd start by doing the must naive thing:

  1. Read both files
  2. Parse the records and store them in a HashMap where the key is the record's id and the value the data stored in that record
  3. Combine the records from both files in another collection (i.e. Vec)

After that you can worry about the more advanced stuff you mentioned, like:

  • Read both files concurrently (i.e. threads or already using async)
  • Combine the reading, parsing and combining steps into smaller chunks (i.e. read a few entries instead of all entries at a time)
  • Make it all asynchronous using streams or message passing
  • A wrapper around HashMap that can only contain 1000 entries at a time
1 Like

Thank you both for your patience and very kind, helpful replies! :blush: The HashMap suggestion is witty, so I of course wanted to try this today. Sadly, I already ground to a halt way before reaching this state. :disappointed_relieved:

Ultimately, I would like my program to take a glob expansion of the file names to be processed:

./program -1 *.pair1.fastq -2 *.pair2.fastq

Therefore, I used Clap and two <Vec<PathBuf>> as the respective data types. From both of these vectors, I would now like to take the n-th elements and combine them into instances of a custom struct FastQPair.


use std::path::Path;

#[derive(Debug, Clone)]
struct FastQPair {
    fastq1: Box<Path>,
    fastq2: Box<Path>,
}

And this is where I am stuck: Taking two <Vec<PathBuf>> and create instances of paired paths for each element. :frowning:

Admittedly, ownership in Rust is still very obscure to me, but as far as I understood, PathBuf is owned, while Path isn't. Therefore, I need the Box<> in the struct, which I modelled after a tutorial example.

Since the PathBuf type remains owned (by whom? By Clap's args struct? By the Vec<>?), I will probably have to .clone() somewhere, which shouldn't hurt too much given hat it is just a path. However, regardless where I put a .to_owned() etc, I don't get it to work.

Would you mind, to kindly give me a nudge in the right direction, please?

let pairs: Vec<FastQPair> = args
        .fastq
        .unwrap()
        .iter()
        .zip(args.fastq2.unwrap().iter())
        .map(|(&f1, &f2)| FastQPair {
            fastq1: (f1),
            fastq2: (f2),
        })
        .collect();

The args.fastq and args.fastq2 are of the<Vec<PathBuf>> type:

struct Opts{
    /// CLI parameters and arguments.
    /// Vector of FastQ file inputs
    #[clap(short = '1', long = "fastq1", value_parser, num_args = 1.., value_delimiter = ' ')]
    fastq: Option<Vec<PathBuf>>,
    #[clap(short = '2', long = "fastq2", value_parser, num_args = 1.., value_delimiter = ' ')]
    fastq2: Option<Vec<PathBuf>>,
}

The error message reads:

error[E0277]: the trait bound `Box<Path>: From<Vec<PathBuf>>` is not satisfied
  --> src/reader_noodles_async.rs:30:21
   |
30 |             fastq1: (f1).into(),
   |                     ^^^^ ---- required by a bound introduced by this call
   |                     |
   |                     the trait `From<Vec<PathBuf>>` is not implemented for `Box<Path>`
   |
   = help: the following other types implement trait `From<T>`:
             <Box<(dyn std::error::Error + 'a)> as From<E>>
             <Box<(dyn std::error::Error + 'static)> as From<&str>>
             <Box<(dyn std::error::Error + 'static)> as From<Cow<'a, str>>>
You tried to use a type which doesn't implement some trait in a place which
expected that trait.

Generally speaking, I (sort of) know what a trait is, but I am lost how I should implement the missing trait and even worse, if that would be needed at all, because ultimately, I just want a struct to hold two paths, so I can handle the pairs more easily. Using <Box<Path> is not of importance, as long at it works... :grimacing:

Thank you so much for your help! I hope that piggybacking on my previous question is Ok(), because I was unsure, if I should rather open a new thread or not.

Have a good start into the new week
Matthias

When people say something is "owned" (not "owned by", just "owned") they mean "not borrowed". The borrowed type would be &Path.

PathBuf and Box<Path> are two different types which both serve the purpose of owning a Path. The reason you cannot own a Path directly is not because it is borrowed, but because it is dynamically sized.

It is generally more convenient to use PathBuf and not use Box<Path> or a mix of them; for example, .to_owned() on a &Path will give you a PathBuf.

2 Likes

Thanks a lot for taking the time to provide me some guidance here. Ultimately, your suggestion to change the struct from Box to PathBuf directly allowed me to weasel through all issues thanks to way less obscure hints from the compiler.

Turns out that the actual problem was not in the code where I believed it to be, but in a check above. I had also included an assertion before to ensure that the Vecs are of the same length.

    // we need to assert that the number of provided arguments is the same.
    assert_eq!(
        args.fastq.unwrap().len(),
        args.fastq2.unwrap().len()
    );

Foolishly, I had already there moved the ownership there for no reason. After adding a .as_ref() there, I now could move ownership of the PathBuf to the new struct instances like I wanted. Thanks a lot!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.