I am struggling to understand the behaviour of flatten_unordered
when nested.
In the below code, I expect that "VALUE"
will be printed every 500ms. However, it is only printed once.
use tokio::time::interval;
use tokio_stream::wrappers;
let inner = IntervalStream::new(interval(std::time::Duration::from_millis(500)))
.map(|_| stream::once(future::ready("VALUE")))
.flatten_unordered(None);
let mut stream = stream::once(future::ready(inner)).flatten_unordered(None);
while let Some(value) = stream.next().await {
dbg!(&value);
}
If I change either of the flatten_unordered
s to simply flatten
, then it prints every 500ms as expected. But in my use case, I want to nest flatten_unordered
. For brevity I will omit my use case - I have tried to simplify the example here as much as possible to demonstrate the behaviour.
Another clue: If I change flatten_unordered(None)
to flatten_unordered(Some(1))
on the inner stream, it prints.
If I simply poll the inner stream, then it works just fine:
let mut inner = IntervalStream::new(interval(std::time::Duration::from_millis(500)))
.map(|_| stream::once(future::ready("VALUE")))
.flatten_unordered(None);
while let Some(value) = inner.next().await {
dbg!(&value);
}
So it seems to be something to do with nesting flatten_unordered
.
From the debugging I've done so far, it looks like wake_by_ref
is not being called on the inner stream's InnerWaker
. But I can't understand why.
If anyone can explain to me why my code is not behaving as I expect it would be much appreciated!
Workaround
I found a workaround: Inverting the nesting of flatten_unordered
fixes it. However I would still like to understand why my initial example behaves the way it does.
let inner = IntervalStream::new(interval(std::time::Duration::from_millis(500)))
.map(|_| stream::once(future::ready("VALUE")));
let mut stream = stream::once(future::ready(inner))
.flatten_unordered(None)
.flatten_unordered(None);
while let Some(value) = stream.next().await {
dbg!(&value);
}```