Possibly losing an item when using select! (Futures 0.3)


#1

Hi, I have a function that takes deals with a stream that lives for the whole function scope (stream_a), and many smaller streams that live for a shorter scope. The smaller streams are obtained from a stream of streams. (See signature of the function combine() below).

I am using Futures 0.3 (futures-preview = "0.3.0-alpha.10" in my Cargo.toml).

I want to be able to select over the incoming items from the streams. There are two rules I want to follow:

  • If stream_a ends, the function should end immediately.
  • Items from any of the streams can not be lost.

My goto tool for selecting between streams is select(), used like this: stream1.select(stream2), but I can not use it here, because select consumes both selected streams, and stream_a needs to live for the entire scope of the function.

Code example

This is my attempt at solving this problem:

#![feature(futures_api, pin, async_await, await_macro, arbitrary_self_types)]
#![feature(generators)]
#![feature(nll)]

use futures::{select, FutureExt, StreamExt};
use futures::channel::mpsc;

enum Item {
    A(Option<u32>),
    B(Option<u32>),
}

#[allow(unused)]
async fn combine(mut stream_a: mpsc::Receiver<u32>, mut stream_b_receiver: mpsc::Receiver<mpsc::Receiver<u32>>) {
    while let Some(mut stream_b) = await!(stream_b_receiver.next()) {
        loop {
            let item = select! {
                opt_item_a = stream_a.next().fuse() => Item::A(opt_item_a),
                opt_item_b = stream_b.next().fuse() => Item::B(opt_item_b),
            };
            match item {
                Item::A(Some(item)) => println!("item = {}", item),
                Item::A(None) => break,
                Item::B(Some(item)) => println!("item = {}", item),
                Item::B(None) => return,
            }
        }
    }
}

However, I believe that this implementation is not correct. I think that items from stream_a may be discarded when some stream_b ends.

I’m interested to know if you think this implementation is actually valid. If not, what would be a correct way to implement it?

Extra notes

(1) When I used Python3 asyncio, I could solve this problem using a mechanism called Shielding. Here is an example from an old code snippet:

# Wait for one of the following events:
# - Get a message from the send_queue.
# - Connection is closed.

get_send_msg = \
        asyncio.ensure_future(self._send_queue.get(),loop=self._loop)
# Shield the future from cancellation:
s_remote_close_fut = asyncio.shield(self._remote_close_fut,loop=self._loop)
done, pending = await \
        asyncio.wait([get_send_msg,s_remote_close_fut],loop=self._loop,\
            return_when=asyncio.FIRST_COMPLETED)

I thought there might be a similar mechanism to be used with select! {}, but I couldn’t find any.

(2) For completeness, the real world code I’m trying to apply this to is here. It’s part of the offst project.

(3) I noticed the existence of select_next_some and I thought it might be useful for this case, but I still didn’t manage to figure out how to use it in this case.