Split a Stream by elapsed time between messages

Hi, I have a ´Stream´ that is expected to return values in "bursts" (few messages, then a longer pause, then other messages).
I am trying (without success) to write an async task that listens to the stream, aggregates all messages together based on the elapsed time between messages and sends the result to a channel, all of this without blocking.
For example, task A listens to the stream and if ´next()´ is taking more than 1s then sends a "burst finished" message to task B. Task B listens to data between "burst finished" messages and aggregates them.

Sorry for not coming up with even a minimal code example but I am really struggling to wrap my head around this..

  • How would one do that?
  • Is there a better alternative architecture or I'm getting everything wrong from the basics?

Many thanks

It sounds like you want to implement a "debounce" or "throttle" which lets you buffer up the messages for a short while and calla function to send them down the channel when there is a gap.

I haven't had to write it in Rust before, but this is how you would implement a debounce in JavaScript. The equivalent should be doable in Rust, too.

function debounce(cb, delay = 250) {
  let timeout

  return (...args) => {
    timeout = setTimeout(() => {
    }, delay)

Throttling is slightly different in that you will choose to not call a callback unless there has been a gap of at least a certain duration.

function throttle(cb, delay = 250) {
  let shouldWait = false

  return (...args) => {
    if (shouldWait) return

    shouldWait = true
    setTimeout(() => {
      shouldWait = false
    }, delay)
1 Like

After much effort I ended up writing something like this. It somewhat works but is very cumbersome and not idiomatic at all.

use std::{
    sync::{Arc, Mutex},
use log::{warn, error, info};
use tokio::{
    time::{Instant, Duration, sleep},

async fn flush_after(dur: Duration, items_vec: Arc<Mutex<Vec<Message>>>, tx_message: Sender<Vec<Message>>) {
    let mut vec_guard = items_vec.lock().unwrap();
    let vec_to_send = (*vec_guard).clone();
    vec_guard.clear(); // clears the vector

async fn killable_flush_after(dur: Duration, mut rx_kill: Receiver<()>, items_vec: Arc<Mutex<Vec<Message>>>, tx_message: Sender<Vec<Message>>) {
    tokio::select! {
        _ = flush_after(dur, items_vec, tx_message) => {}, // flush_after sleeps and tries to send the vector
        _ = rx_kill.recv() => {}, // if a message is received, the future rx_kill.recv() completes

async fn main() -> Result<(), Box<dyn std::error::Error>> {

    // some code
    // stream: StreamExt

    let (tx_message, rx_message) = broadcast::channel(100);
    let (tx_kill, rx_kill) = broadcast::channel(10);
    let max_duration = Duration::from_millis(100);
    let items_vec = Arc::new(Mutex::new(Vec::new()));
    while let Some(item) = stream.next().await {
        let _ = tx_kill.send(());
        tokio::spawn(killable_flush_after(max_duration, tx_kill.subscribe(), items_vec.clone(), tx_message.clone()));
            let mut items_lock = items_vec.lock().unwrap();
            let message = deserialize_message(item)?; // TODO: catch error
            info!("SENDING - {:?}", message);
            (*items_lock).push(message); // TODO: catch error



If I'm understanding what you want correctly, I think you can just use tokio::time::timeout


use futures::{pin_mut, StreamExt};
use tokio::time::{sleep, Duration};

#[derive(Clone, Debug)]
struct Message(usize);

async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let stream = futures::stream::unfold(0, |i| async move {
        if i < 100 {
            if i != 0 && i % 10 == 0 {
            Some((Message(i), i + 1))
        } else {

    let max_duration = Duration::from_millis(100);

    let mut items = Vec::new();
    let mut done = false;
    let mut timed_out = false;
    loop {
        // timeout will cancel the future when the timeout fires. futures::StreamExt::next is documented as being cancel safe here: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
        // Using a non-cancel safe future could result in losing values when the timeout fires at the wrong time.
        match tokio::time::timeout(max_duration, stream.next()).await {
            Ok(Some(message)) => items.push(message),
            Ok(None) => {
                done = true;
                timed_out = true
            Err(_) => {
                timed_out = true;

        if timed_out {
            timed_out = false;
            if items.is_empty() {
            let burst = std::mem::take(&mut items);

        if done {


That is definitely a more ergonomic solution, thanks!

Depending on the details of how the system is expected to behave, you may also want to set a maximum interval messages are allowed to accumulate or a limit on the number of messages allowed to accumulate. Otherwise your code could stall out if the duration between bursts gets shorter than your hard coded timeout for some reason.