Process multiple async streams concurrently, but not in parallel

After having spent over a day looking into provided APIs & macros to what I thought would be a very standard issue, and fruitlessly attempting to implement a custom solution, and scouring the net for similar issues and having found none, I've come looking for help

Context : Single-threaded async env (Sync + !Send), not using tokio, just standard library & helper crates (futures, pin_project, future_utils etc)

Consider the following function input :

async fn execute_streams(streams: Vec<impl Stream<Item = String>>);

Objective : Drive all streams to completion concurrently, processing their outputs as they arrive, as fast as possible.

Solutions I've tried :

  1. futures::stream_select!() macro (same for futures::stream::select_all) : Close but it doesn't do exactly what I need : it awaits until all streams are Ready to yield their first item, then round-robins (or custom strategy with select_with_strategy) over the results, then waits for all to yield the second result, and so on. If we have a fast stream with N items and a slow stream with N items, items will always be yielded in pairs at the speed of the slow stream, so fast stream can't complete, well, fast.
  2. Write a custom StreamProcessor wrapper : Didn't get me far as I needed to store both the stream list and their next() Future, because Next has a mutable reference to &'a mut St and I couldn't figure out how to satisfy the compiler to have self-referential mutable reference that would need to be updated in the poll. I feel like there must be a crate or an existing function I could not find which does this ? If anyone knows/able to write this kind of generic wrapper, I'm all ears (eyes)
  3. Write it as a function with processing of futures using FuturesUnordered. This is as close as I could get, modifying my stream's type from Stream<Item = String> to Stream<Item = (usize, String)> to keep track of its own index :
    let mut futures = FuturesUnordered::new();
    for stream in streams.iter_mut() {
        futures.push(stream.next());
    }

    while let Some(stream_item_result) = futures.next().await {
        if let Some((stream_index, output)) = stream_item_result {
            // Consume output here
            let stream_next = streams[stream_index].next();
            futures.push(stream_next);
        }
    }

But of course, this doesn't compile because streams is mutably borrowed more than once. Since I know (but the compiler doesn't) that a stream result makes it that the borrow should actually be relinquished, I've tried using RefCell to store each stream in the vector (so I didn't need mutable streams iterator), but ran into lifetime issues with the temporarily borrowed RefMut

I've also tried putting the stream.next() into async move {} block, but this lead to the error of futures not having the exact same inner type.

So, now I'm stuck. Please halp

Are you using the futures executor? In that case, it provides a LocalPool that sounds applicable to your problem.

I assume you just want to get items from the streams as fast as possible, without any streams waiting for each other.

Can you consume streams in separate loops?

join_all(all_streams.into_iter().map(|stream| async {
   while let Some(item) = stream.next().await {
      consume(item);
   }
})).await; // it happens to collect into a Vec, but Vec<()> is cheap.

The consume function may need to be shareable (e.g. a method on Rc<RefCell>-wrapped object), but each stream-iterating future would be separate, and independently drive its stream to completion without waiting for others.

If all else fails, there's a low-level poll_next that you can use to construct your own multi-stream-polling strategy.

1 Like

I've just tested it, unfortunately this doesn't work - with the above example of fast and slow stream this outputs in pairs, similar to select_all

    // With a writer  Rc<RefCell<Pin<Box<dyn AsyncWrite + '_>>>>
    join_all(streams.into_iter().map(|mut stream| {
        let writer_rc = Rc::clone(&writer);
        async move {
            while let Some((_, output)) = stream.next().await {
                let mut writer_b = writer_rc.borrow_mut();
                let _ = writer_b.deref_mut().write(output.as_bytes()).await;
            }
        }
    })).await;

Indeed I'm using futures executor. LocalPool looks indeed very promising, I will try that !

It seems that to use LocalPool I'll need to refactor a big chunk of my app, including removing lifetimes from the writer, all of that just for the final step where I process the streams - because until now everything lived inside a single block_on task.

I'll work towards this direction, but I'm curious if there isn't a logical single task solution for this issue. My impl seems so close but I can't find a way to beat the borrow checker

How are you implementing the slow stream for testing? That sounds like the behavior you would get if you inserted a std::thread::sleep(), which blocks the thread and prevents any join-based concurrency from taking effect.

I don't see why they would do that…

Can your write also block for a significant amount of time? If so, running that in parallel with other streams would help. A nested join!(stream.next(), write(previous)) could work, and you could make streams buffered if there can be a large difference.

Alternatively, you could use an async channel to decouple stream reads from writes better: read from all the streams, but instead of calling a slow write, pass the output to an mpsc channel that will handle buffering and backpressure across all writers, and then consume the channel in another future that just feeds the outputs to write.

I had to make a clean demo with your suggestion for the evidence that it indeed does work (using threads just to emulate waiting without blocking main thread, in my actual program I use http requests with a real streaming API)
I have a couple of stream wrappers between this final stream vec stitching code and the origin sources, so the poll coupling issue lies somewhere there. At least this particular question is solved, thank you !