Nesting StreamExt::flatten_unordered producing unexpected results

I am struggling to understand the behaviour of flatten_unordered when nested.

In the below code, I expect that "VALUE" will be printed every 500ms. However, it is only printed once.

use tokio::time::interval;
use tokio_stream::wrappers;

let inner = IntervalStream::new(interval(std::time::Duration::from_millis(500)))
    .map(|_| stream::once(future::ready("VALUE")))
    .flatten_unordered(None);

let mut stream = stream::once(future::ready(inner)).flatten_unordered(None);

while let Some(value) = stream.next().await {
    dbg!(&value);
}

If I change either of the flatten_unordereds to simply flatten, then it prints every 500ms as expected. But in my use case, I want to nest flatten_unordered. For brevity I will omit my use case - I have tried to simplify the example here as much as possible to demonstrate the behaviour.

Another clue: If I change flatten_unordered(None) to flatten_unordered(Some(1)) on the inner stream, it prints.

If I simply poll the inner stream, then it works just fine:

let mut inner = IntervalStream::new(interval(std::time::Duration::from_millis(500)))
    .map(|_| stream::once(future::ready("VALUE")))
    .flatten_unordered(None);

while let Some(value) = inner.next().await {
    dbg!(&value);
}

So it seems to be something to do with nesting flatten_unordered.

From the debugging I've done so far, it looks like wake_by_ref is not being called on the inner stream's InnerWaker. But I can't understand why.

If anyone can explain to me why my code is not behaving as I expect it would be much appreciated!

Workaround

I found a workaround: Inverting the nesting of flatten_unordered fixes it. However I would still like to understand why my initial example behaves the way it does.

let inner = IntervalStream::new(interval(std::time::Duration::from_millis(500)))
    .map(|_| stream::once(future::ready("VALUE")));

let mut stream = stream::once(future::ready(inner))
    .flatten_unordered(None)
    .flatten_unordered(None);

while let Some(value) = stream.next().await {
    dbg!(&value);
}```

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.