Hi, I have a function that takes deals with a stream that lives for the whole function scope (stream_a
), and many smaller streams that live for a shorter scope. The smaller streams are obtained from a stream of streams. (See signature of the function combine()
below).
I am using Futures 0.3 (futures-preview = "0.3.0-alpha.10"
in my Cargo.toml).
I want to be able to select over the incoming items from the streams. There are two rules I want to follow:
- If
stream_a
ends, the function should end immediately. - Items from any of the streams can not be lost.
My goto tool for selecting between streams is select()
, used like this: stream1.select(stream2)
, but I can not use it here, because select consumes both selected streams, and stream_a
needs to live for the entire scope of the function.
Code example
This is my attempt at solving this problem:
#![feature(futures_api, pin, async_await, await_macro, arbitrary_self_types)]
#![feature(generators)]
#![feature(nll)]
use futures::{select, FutureExt, StreamExt};
use futures::channel::mpsc;
enum Item {
A(Option<u32>),
B(Option<u32>),
}
#[allow(unused)]
async fn combine(mut stream_a: mpsc::Receiver<u32>, mut stream_b_receiver: mpsc::Receiver<mpsc::Receiver<u32>>) {
while let Some(mut stream_b) = await!(stream_b_receiver.next()) {
loop {
let item = select! {
opt_item_a = stream_a.next().fuse() => Item::A(opt_item_a),
opt_item_b = stream_b.next().fuse() => Item::B(opt_item_b),
};
match item {
Item::A(Some(item)) => println!("item = {}", item),
Item::A(None) => break,
Item::B(Some(item)) => println!("item = {}", item),
Item::B(None) => return,
}
}
}
}
However, I believe that this implementation is not correct. I think that items from stream_a
may be discarded when some stream_b
ends.
I'm interested to know if you think this implementation is actually valid. If not, what would be a correct way to implement it?
Extra notes
(1) When I used Python3 asyncio, I could solve this problem using a mechanism called Shielding. Here is an example from an old code snippet:
# Wait for one of the following events:
# - Get a message from the send_queue.
# - Connection is closed.
get_send_msg = \
asyncio.ensure_future(self._send_queue.get(),loop=self._loop)
# Shield the future from cancellation:
s_remote_close_fut = asyncio.shield(self._remote_close_fut,loop=self._loop)
done, pending = await \
asyncio.wait([get_send_msg,s_remote_close_fut],loop=self._loop,\
return_when=asyncio.FIRST_COMPLETED)
I thought there might be a similar mechanism to be used with select! {}
, but I couldn't find any.
(2) For completeness, the real world code I'm trying to apply this to is here. It's part of the offst project.
(3) I noticed the existence of select_next_some and I thought it might be useful for this case, but I still didn't manage to figure out how to use it in this case.