Select_all not returning items in the order they are pushed

I am trying to aggregate a few Streams with select_all and I am not getting the behaviour I expect. In the documentation it is stated: " The stream will yield items as they become available on the underlying streams internally, in the order they become available." but this is not the case according to the following test:

use futures::stream::select_all;
use std::f64::consts::PI;
use tokio::sync::mpsc;
use tokio_stream::{StreamExt, wrappers::UnboundedReceiverStream};

#[derive(PartialEq, Debug)]
enum StreamValue {
    Integer { value: i32 },
    Float { value: f64 },
    Text { value: String },
}

#[derive(Debug, Clone)]
pub enum Order {
    Integer,
    Float,
    Text,
}

async fn test(order1: Order, order2: Order, order3: Order) {
    // Arrange
    let (int_sender, int_receiver) = mpsc::unbounded_channel();
    let (float_sender, float_receiver) = mpsc::unbounded_channel();
    let (text_sender, text_receiver) = mpsc::unbounded_channel();

    let int_stream = UnboundedReceiverStream::new(int_receiver);
    let float_stream = UnboundedReceiverStream::new(float_receiver);
    let text_stream = UnboundedReceiverStream::new(text_receiver);

    let senders = vec![int_sender, float_sender, text_sender];
    let streams = vec![int_stream, float_stream, text_stream];

    let mut results = select_all(streams);

    send(order1.clone(), &senders);
    send(order2.clone(), &senders);
    send(order3.clone(), &senders);

    assert(order1, &mut results).await;
    assert(order2, &mut results).await;
    assert(order3, &mut results).await;
}

fn send(order: Order, senders: &[mpsc::UnboundedSender<StreamValue>]) {
    match order {
        Order::Integer => senders[0].send(StreamValue::Integer { value: 42 }).unwrap(),
        Order::Float => senders[1].send(StreamValue::Float { value: PI }).unwrap(),
        Order::Text => senders[2]
            .send(StreamValue::Text {
                value: "Hello".to_string(),
            })
            .unwrap(),
    }
}

async fn assert(order: Order, results: impl futures::Stream<Item = StreamValue> + Unpin) {
    let state = Box::pin(results).next().await.unwrap();
    match order {
        Order::Integer => assert_eq!(state, StreamValue::Integer { value: 42 }),
        Order::Float => assert_eq!(state, StreamValue::Float { value: PI }),
        Order::Text => assert_eq!(
            state,
            StreamValue::Text {
                value: "Hello".to_string()
            }
        ),
    }
}

#[tokio::test]
async fn test_all_combinations() {
    test(Order::Integer, Order::Float, Order::Text).await;

    // These tests fail!
    test(Order::Integer, Order::Text, Order::Float).await;
    test(Order::Text, Order::Integer, Order::Float).await;
    test(Order::Text, Order::Float, Order::Integer).await;
    test(Order::Float, Order::Integer, Order::Text).await;
    test(Order::Float, Order::Text, Order::Integer).await;
}

Is there anything I am doing wrong? In case select_all works "as designed" how should I implement select_all_ordered that passes the test above?

Thanks in advance for the support.

In contrast to

which states

There are no guarantees provided on the order of the list with the remaining futures. They might be swapped around, reversed, or completely random.

This is not documented for

So I'd file a bug report on this, since either the implementation is wrong or the documentation is insufficient.

The behavior you are asking for is impossible to implement. Given your code,

    send(order1.clone(), &senders);
    send(order2.clone(), &senders);
    send(order3.clone(), &senders);

    let mut results = select_all(streams);

at the point when select_all runs and examines streams for the first time, there is no information about what order those streams came to have items available, because they were not polled during that time period — they were only polled after all 3 sends executed. Essentially, those 3 items were always available from the beginning, and therefore select_all cannot do anything but make an arbitrary choice.

select_all — or any other function used in the position you have used it — inherently can only see the relative ordering of events if it was polled between those events. There are no global timestamps it could use to retrieve ordering information it didn’t directly observe.

Thanks for your reply.

I have inverted the order:

  • select_all first
  • sends after

Why does it still not work?

Because there are still no in-between points at which the select_all is polled and can thereby look at the streams and see which item arrived first. To illustrate, your test will (obviously) pass if you do this:

    let mut results = select_all(streams);

    send(order1.clone(), &senders);
    assert(order1, &mut results).await;
    send(order2.clone(), &senders);
    assert(order2, &mut results).await;
    send(order3.clone(), &senders);
    assert(order3, &mut results).await;

Then, the select_all is polled (by assert) and will have a chance to notice that order1 arrived before order2. Of course, now the test is trivial — it couldn't possibly produce a different order.

If your goal is for your application to be able to reliably notice what order items are sent in, then you should put all those items in one channel, not three channels.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.