Process ordered list of items in parallel with results streamed in order?

I have an ordered list of items that I’d like to process in parallel. Normally I would just use rayon’s par_iter. But in this case I want to have the results “streamed” to me in order through a channel Reciver that the function will return. I would like to start receiving processed items quickly, before the entire list has been processed.

This seams like it might be a common need… is there some crate that will do this?

When considering a problem like this and whether or not you can have a computer solution to it, it is helpful to reduce to the simplest case and take “computer” out of it and see if you could describe a procedure that would accomplish the goal. In this case, the simplest form would be:

  • You have a stack of sorted cards
  • You have 2 people whose job it is to check each card, and if the card has an even number written on it, they should mark it with a blue check mark. If it has and odd number on it, they should mark it with a red X.
  • When each person is done checking a card, they hand it to a 3rd person who stacks them back up. This person does not want to sort the cards, he just wants to stack them up and when done the stack should be in the original sorted order

Now, what instructions would you give to the 3 people so that this could work as efficiently as possible? Now, ask yourself, when your replace the first 2 people with a thread each and the 3rd person with another thread that does the collecting, would you gain anything over just using 2 people? Why or why not?

2 Likes

Thanks for your response.

My end goal is to search a filesystem tree using libripgrep. I want matches to be returned in filesystem order, like what you would see in a tree view UI with all branches expanded. But I don’t want to wait for the entire filesystem to be searched before I start seeing results.

I didn’t make it clear originally, but I expect the results will be passed through a channel, or maybe a callback. I want them “streamed” (not sure that’s the correct term) so that I start getting results immediately before the entire list is processed.

  1. You have a stack of sorted cards

    Yes, or in my case ordered list of PathBuf. Each PathBuf is associated with it’s index position using an enumerate iterator.

  2. You have 2 people whose job it is to check each card

    Yes, each person searches the content of the file at each path, and returns if it’s match or no match. They also return the PathBuf with associated index.

  3. When each person is done checking a card, they hand it to a 3rd person who stacks them back up. This person does not want to sort the cards, he just wants to stack them up and when done the stack should be in the original sorted order

    I’m not sure about this part.

    I think this third person does need to do some sorting since the searching results might not come back in order. But they can use a BinaryHeap to temporarily store the results ordered by index… until they receive the index they are looking for. Ie 0, then 1, then 2, etc.

I’m most of the way through my own implementation of this. But I’m newish to rust and wondering if it had all been done before by someone who better knew what they were doing. It seems like it must be a common pattern? … Want results as fast as possible AND in order. So process in parallel from front of list and temporarily store results in BinaryHeap until the “next” result is ready. Then return results in order until there’s a missing result, wait till it appears.

Ask yourself this, if that 3rd person has to sort, are you gaining any efficiency by doing things in parallel?

I do that in gifski. The key is that BinaryHeap will cheaply sort things and let you pick the smallest:

1 Like

You can sort of fake the streaming if you batch it. Parallel process N ordered items, print those N ordered results, process the next N, etc. If you get fancy you can even start those next items while you print the former, like double-buffering.

1 Like

Perhaps look at https://docs.rs/futures/0.1.25/futures/stream/fn.futures_ordered.html coupled with something like https://docs.rs/futures-cpupool/0.1.8/futures_cpupool/.

2 Likes

Thanks, this looks like a cleaner implementation of what I was doing myself. Good to learn from.

I haven't actually tried this yet, but from the description it sounds like exactly what I'm looking for.

Thanks again, I’m using this now to order the results of my async code. The futures solution looks like it might be a good turn key solution to the entire problem of “parallel process results, streaming them in order”, but at this point I decided that I didn’t want to commit to the whole futures system. Anyway thanks for everyone’s help.