Determine which of a list of streams yielded items? e.g. futures::stream::select_all but returns index?

I have a list of streams which all generate items of the same type, and want to, in a loop:

  • get the item from the next ready stream
  • know which stream the item came from

I know about futures::stream::select_all, but this seems to discard information about which stream the item came from.

I think I want something like

use tokio_stream::wrappers::WatchStream;

type Item = ...;

let receivers: Vec<WatchStream<Item>> = ...;
let idx_items = ???(receivers);

while let Some((idx, item)) = idx_items.next().await {
  
}

You can still use select_all for this. Define a helper function:

use futures::stream::{Stream, StreamExt};

fn with_id<S: Stream>(id: usize, stream: S) -> impl Stream<Item = (usize, S::Item)> {
    stream.map(move |item| (id, item))
}

Now you can use select_all like this:

use futures::stream::select_all::SelectAll;

let mut idx_items = SelectAll::new();
for (id, stream) for receivers {
    idx_items.push(with_id(id, stream));
}

while let Some((idx, item)) = idx_items.next().await {
    ...
}
2 Likes

This makes a lot of sense! Thanks for the helpful + lightning-fast response⚡️. This works like a charm.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.