Please help me get rayon--struggling with trait implementations--to work in my rust-bio bioinformatics tool Rust implementation

Edit: I'm adding this note after getting some useful replies and changing the Github repository for this project quite a bit. Knowing this, if you're interested, you should be able to follow what I changed and learned.

I hope nobody thinks I'm spamming by sharing my work on Github. It seemed the easiest way to demonstrate my problem. Believe me, I've been reading around all day trying to figure this out.

GitHub - suchapalaver/krust: k-mer counter in Rust using the rust-bio and rayon crates is an implementation in Rust of a kmer counter, a common tool in bioinformatics for analyzing DNA sequences. The files used for the relevant data often contain multiple sequences of data, so the task is given to parallelization/cocurrency/multithreadedness.

I have three branches in this repo -- main, single_thread, and rayonizing.

  • main compiles and works and provides cocurrent processing using std::thread.
  • single_thread just uses a single thread to complete everything and works and compiles.
  • rayonizing, which I hoped would do exactly what main does but with rayon, gives me the following error:
the method `into_par_iter` exists for struct `bio::io::fasta::Records<BufReader<File>>`, but its trait bounds were not satisfied
    --> src/lib.rs:47:22
     |
47   |       reader.records().into_par_iter().for_each(|result| {
     |                        ^^^^^^^^^^^^^ method cannot be called on `bio::io::fasta::Records<BufReader<File>>` due to unsatisfied trait bounds
     | 
    ::: /home/jojo/.cargo/registry/src/github.com-1ecc6299db9ec823/bio-0.37.1/src/io/fasta.rs:1015:1
     |
1015 | / pub struct Records<B>
1016 | | where
1017 | |     B: io::BufRead,
1018 | | {
1019 | |     reader: Reader<B>,
1020 | |     error_has_occured: bool,
1021 | | }
     | | -
     | | |
     | |_doesn't satisfy `_: rayon::iter::IntoParallelIterator`
     |   doesn't satisfy `_: rayon::iter::ParallelIterator`
     |
     = note: the following trait bounds were not satisfied:
             `bio::io::fasta::Records<BufReader<File>>: rayon::iter::ParallelIterator`
             which is required by `bio::io::fasta::Records<BufReader<File>>: rayon::iter::IntoParallelIterator`
             `&bio::io::fasta::Records<BufReader<File>>: rayon::iter::ParallelIterator`
             which is required by `&bio::io::fasta::Records<BufReader<File>>: rayon::iter::IntoParallelIterator`
             `&mut bio::io::fasta::Records<BufReader<File>>: rayon::iter::ParallelIterator`
             which is required by `&mut bio::io::fasta::Records<BufReader<File>>: rayon::iter::IntoParallelIterator`

The part of the code in question that I changed in order to get the error is this part quoted in the error message, here from the rayonizing branch of my repository, src/lib.rs:

let reader = fasta::Reader::from_file(&filepath).unwrap();

    reader.records().into_par_iter().for_each(|result| {
	
        let result_data = result.unwrap();

Please note that Send and Sync are Auto Trait Implementations for the Struct bio::io::fasta::Reader that I am using from the rust-bio crate (https://docs.rs/bio/0.20.3/bio/io/fasta/struct.Reader.html#method.records), and for its records method (bio::io::fasta::Reader - Rust).

I'm new this year to programming in general, since January, and this is my first program in Rust. Thank you for any help!

You need to use backticks ``` or tildes ~~~ to get properly formatted code blocks, not single quotes '''.

I suggest doing the same thing with compiler errors, it's easier to read them that way.

Regarding your actual problem, the issue is just that bio::io::fasta::Records does not implement rayon's IntoParallelIterator trait, which is required to use the into_par_iter method. In fact, it looks like the bio crate doesn't include implementations of rayon traits for any of its types, or even depend on rayon. So you won't be able to rely on prewritten parallel iterators to parallelize your computation.

(rayon, though powerful, isn't magic: to use the parallel iterator methods it provides, you have to give it something that acts like a parallel iterator. This contract is formalized by a trait, which third-party libraries must implement if they want their types to be usable with rayon.)

1 Like

Thanks for your help. Sorry to be dense, but ... is that something I can try to figure out how to do--write my own implementation of rayon's IntoParallelIterator trait for bio::io::fasta::Records? Is the Advanced Traits chapter of The Book talking about this?

Records doing some file IO on its iteration. Doing IO within the rayon task is discouraged as it would block the threadpool which may results underutilization of cores.

Easiest thing you can do is to .collect() the iterator into vector and process them in parallel. If it has too many elements so you can't store them all in memory, read them in large chunks and process them.

3 Likes

You can't implement a trait from another crate (rayon) for a type from another crate (bio) but you can implement the trait for a wrapper type you define and use.

However, in this case, since you are parsing a file (a fundamentally serial operation), there is probably no advantage over using rayon's ParallelBridge, which converts an ordinary serial iterator to a Rayon parallel iterator (at the price of losing ordering, but if needed you can keep track of the original order by Iterator::enumerate()).

2 Likes

Thanks for explaining that. I’ll try and write an implementation using ParallelBridge. The order isn’t important but I’ll have a go at getting it to work both ways using the enumerate function you point out. Did you just figure that out by looking at the docs (if so, I’m interested in what your thought process was) or do you just already know a lot about rayon? Thanks again for helping.

Thanks for your suggestions and your point about IO, which I’m still getting my head around as a newcomer to all of this. I’m going to try writing an implementation that follows your suggestions. Does that mean I should also try to handle the printing output to file outside of the parallelization? I’d had that figured as something that could be done faster concurrently.

Did you just figure that out by looking at the docs (if so, I’m interested in what your thought process was) or do you just already know a lot about rayon?

I've already used rayon. But if I hadn't, I might have looked at the docs for IntoParallelIterator and observed that it says “By implementing IntoParallelIterator for a type, you define how it will transformed into an iterator”, and that it is implemented for many specific collection types, not implemented generically and not implemented for iterators — thus, one should not expect it to automatically work for an arbitrary input type from another crate that wasn't designed for use with rayon.

Another angle is: as file reading is serial, if this is going to be possible at all, there must be a serial (Iterator or maybe Read) to parallel conversion somewhere in the process. Thus, one should look at rayon's docs for means to convert a regular Iterator to a ParallelIterator.

Finally, it's important to understand that the error “… but its trait bounds were not satisfied”, for a trait method, doesn't necessarily mean "you could do this but something else is wrong”; it only means “there was a method by this name in scope”, and that the trait has at least one generic implementation. I could write false.into_par_iter() and I'd get the same error message — even though a bool is not any kind of collection or iterator.

2 Likes

Thanks so much for sharing how someone more experienced tackles this kind of thing. It's really useful.

I'm not going to ask you to explain what I should be understanding when you write that "file reading is serial" but I don't understand it right now. Sort of in connection, I'm confused why my implementation of in the main branch of my Github repo works great the way it is without using rayon, just the standard library's std::thread. I wanted to use rayon precisely because I wasn't sure whether what I'd done was the right way to handle cocurrency in this project. Right now, in my blissful state of ignorance, I'm assuming that what I have below (taken from the lib.rs of the main branch of my Github krust repo) is an example of serial to parallel conversion, too, but it works really well, it seems. This is an implementation that runs cocurrently using std::thread:


    let reader = fasta::Reader::from_file(&filepath).unwrap();

    let mut threads = Vec::new();

    for result in reader.records() {                                            

	let t = thread::spawn( move || {
	    
            let result_data = &result.unwrap();

	    let pathname = format!("output/{}.tsv", result_data.id());
	    
	    let path = Path::new(&pathname);

	    let display = path.display();

	    let mut file = match File::create(&path) {
		
		Err(why) => panic!("couldn't create {}: {}", display, why),
		Ok(file) => file,
	    };
	    for (kmer, kmer_positions) in hash_kmers(result_data.seq(), kmer_len) {
		
		let rvc = revcomp(kmer);
		
		match str::from_utf8(kmer) {
		    
		    Err(e) => println!("Problem: {}", e),    
		    Ok(kmer_s) => {
			
			match str::from_utf8(&rvc) {
			    
			    Ok(rvc) => {
				
				let data = format!("{}\t{}\t{}\n", kmer_s, rvc, kmer_positions.len());
				write!(file, "{}", data).expect("Unable to write file");
			    }
			    Err(why) => panic!("couldn't write to {}: {}", display, why),
			}
		    }
		}
	    }
	});
	threads.push(t);
    }
    for t in threads {
	
	t.join().expect("thread failed");
    }

I wanted to use rayon precisely because I wasn't sure whether what I'd done was the right way to handle cocurrency in this project. … I'm assuming that what I have below (taken from the lib.rs of the main branch of my Github krust repo) is an example of serial to parallel conversion, too, but it works really well, it seems.

    for result in reader.records() {                                            
	let t = thread::spawn( move || {	    

This reads records as fast as possible and creates threads for each one, regardless of how much output has been done so far or how many threads exist. This will work very well if you have a small number of “records”, each of which has expensive processing to be done on a small amount of input data.

However, it creates (potentially) as many threads as records, and then the OS's scheduler will try to ensure that all those threads get chances to run soon, thus wasting CPU resources on context-switches between different parts of the same work. It is more efficient to create only as many threads as you have CPU cores (so that each thread can stay running on one core), rather than one per record. (Unless you have fewer records than cores!) Also, it will (potentially) bring them all into memory at once even if that is wasteful of memory.

Using Rayon should get you all of this functionality implicitly:

  • Create exactly as many threads as you have CPU cores (unless you configure it differently).
  • Wait to read more records until there are threads free to process them.
  • When a thread has finished a record, have that thread take another record rather than shutting down.
3 Likes

You've been very generous. Thank you. This reply saves me posting a whole other question about this!

This works! Thank you!

let reader = fasta::Reader::from_file(&filepath).unwrap();

    let fasta_records: Vec<Result<fasta::Record, std::io::Error>> = reader.records().collect();

    fasta_records.par_iter().for_each(|result| {

// ...

I created a collect_the_iterator branch with the whole implementation here.

Thanks again for your help!

ParallelBridge worked great!

Here's the relevant code I changed after you put me onto it:

extern crate rayon;
use rayon::iter::ParallelBridge;
use rayon::prelude::ParallelIterator;

// . . .

let reader = fasta::Reader::from_file(&filepath).unwrap();

    reader.records().par_bridge().for_each(|result| {

// . . .

I created a ParallelBridge branch with the whole implementation here. Thanks again for your help!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.