Hi, I have a ´Stream´ that is expected to return values in "bursts" (few messages, then a longer pause, then other messages).
I am trying (without success) to write an async task that listens to the stream, aggregates all messages together based on the elapsed time between messages and sends the result to a channel, all of this without blocking.
For example, task A listens to the stream and if ´next()´ is taking more than 1s then sends a "burst finished" message to task B. Task B listens to data between "burst finished" messages and aggregates them.
Sorry for not coming up with even a minimal code example but I am really struggling to wrap my head around this..
How would one do that?
Is there a better alternative architecture or I'm getting everything wrong from the basics?
It sounds like you want to implement a "debounce" or "throttle" which lets you buffer up the messages for a short while and calla function to send them down the channel when there is a gap.
I haven't had to write it in Rust before, but this is how you would implement a debounce in JavaScript. The equivalent should be doable in Rust, too.
use futures::{pin_mut, StreamExt};
use tokio::time::{sleep, Duration};
#[derive(Clone, Debug)]
struct Message(usize);
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream = futures::stream::unfold(0, |i| async move {
if i < 100 {
if i != 0 && i % 10 == 0 {
sleep(Duration::from_millis(200)).await;
}
Some((Message(i), i + 1))
} else {
None
}
});
pin_mut!(stream);
let max_duration = Duration::from_millis(100);
let mut items = Vec::new();
let mut done = false;
let mut timed_out = false;
loop {
// timeout will cancel the future when the timeout fires. futures::StreamExt::next is documented as being cancel safe here: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
// Using a non-cancel safe future could result in losing values when the timeout fires at the wrong time.
match tokio::time::timeout(max_duration, stream.next()).await {
Ok(Some(message)) => items.push(message),
Ok(None) => {
done = true;
timed_out = true
}
Err(_) => {
timed_out = true;
}
}
if timed_out {
timed_out = false;
if items.is_empty() {
continue;
}
let burst = std::mem::take(&mut items);
println!("{burst:?}");
}
if done {
break;
}
}
Ok(())
}
Depending on the details of how the system is expected to behave, you may also want to set a maximum interval messages are allowed to accumulate or a limit on the number of messages allowed to accumulate. Otherwise your code could stall out if the duration between bursts gets shorter than your hard coded timeout for some reason.