Hey folks. I'm wondering if I can get some high level help on how to solve a certain problem. TLDR: I am receiving a continuous stream of Strings, for each I want to call an async action and be able to process multiple at the time, but I wish to output in the same order I received the strings.
Longer Version, currently sending each string via MPSC for process.
For each string I receive on an MPSC receiver, I want to call an
async method process_img_by_name (returns Vec u8) but not await in the while loop. When the process_img_by_name is done for a String, I want to send the bytes to an UnboundedSender.
Here's the tricky part, I want things to go through the sender in the same order I received them.
Here's an example to clarify:
receiver gets "String A"
calls process_img_by_name which will take 2s and yield VecA
receiver gets "String B"
calls process_img_by_name which will take 500ms and yield VecB
VecA gets sent to output UnboundedSender
VecB gets sent to output
I'm not sure I can use FuturesOrdered for this since the method that is sending them to the MPSC is receiving these names in a Stream. So I can't know all of them ahead of time.
MPSC receiver method, if it helps clarify what I'm trying to do.
pub async fn process_image_listener(&self, mut rx: UnboundedReceiver<String>, output: UnboundedSender<Vec<u8>>) {
while let Some(image_name) = rx.recv().await {
// Call do_image_post_process but don't wait in this loop so we can start processing
// other requests at same time
tokio::spawn(process_img_by_name(image_name));
}
}
Looking for some high lever ideas on how I can solve this.
In your case, receiver needs to hold the order of the received strings in some state (i.e. using a VecDeque), and then call process_img_by_name to perform the work asynchronously.
Since receiver is the one that holds the truth regarding the order, you need a way to communicate back from process_img_by_name when the work is done. One way you could do this is using another MPSC channel, cloning the sender and passing it to process_img_by_name.
Now the only thing left to do would be to manage the state.
you need a way to communicate back from process_img_by_name when the work is done. One way you could do this is using another MPSC channel
I think this would still suffer the pitfall that if B finished before A, then I'd be lacking a way of making sure I send A first, or no? How does this solve that issue?
Okay, receiver has a Vec Deque: [A, B]
Both A, and B and asynchronously being processed by process_img_by_name. B finishes first and sends message to receiver.
hmm, we could peek the queue and see A is first in list and we do nothing. But if A sends message and it's first, then we pop the queue. That what you're thinking?
There's buffered, which might be sufficient for you. You'd map your stream to futures (if you use tokio::spawn, that would happen here), then call buffered, then map it again (or while loop) to send the results to the output channel.
The downside, though, is that it only holds n futures at a time, including completed futures, so if there's high variability in the future's completion time, then this might not be ideal. I think the queue idea above would allow you to have n pending futures and arbitrarily many completed futures, but you'll need organize them yourself. MaybeDone might be useful for that.
Note that a lot of the complexity of buffered and related things in the futures crate is in making sure all the futures are polled at the correct time, but if you are for sure using tokio::spawn, then this is less relevant and you can just await on the next future.