Split a Stream by elapsed time between messages

Hi, I have a ´Stream´ that is expected to return values in "bursts" (few messages, then a longer pause, then other messages).
I am trying (without success) to write an async task that listens to the stream, aggregates all messages together based on the elapsed time between messages and sends the result to a channel, all of this without blocking.
For example, task A listens to the stream and if ´next()´ is taking more than 1s then sends a "burst finished" message to task B. Task B listens to data between "burst finished" messages and aggregates them.

Sorry for not coming up with even a minimal code example but I am really struggling to wrap my head around this..

  • How would one do that?
  • Is there a better alternative architecture or I'm getting everything wrong from the basics?

Many thanks

It sounds like you want to implement a "debounce" or "throttle" which lets you buffer up the messages for a short while and calla function to send them down the channel when there is a gap.

I haven't had to write it in Rust before, but this is how you would implement a debounce in JavaScript. The equivalent should be doable in Rust, too.

function debounce(cb, delay = 250) {
  let timeout

  return (...args) => {
    clearTimeout(timeout)
    timeout = setTimeout(() => {
      cb(...args)
    }, delay)
  }
}

Throttling is slightly different in that you will choose to not call a callback unless there has been a gap of at least a certain duration.

function throttle(cb, delay = 250) {
  let shouldWait = false

  return (...args) => {
    if (shouldWait) return

    cb(...args)
    shouldWait = true
    setTimeout(() => {
      shouldWait = false
    }, delay)
  }
}
1 Like

After much effort I ended up writing something like this. It somewhat works but is very cumbersome and not idiomatic at all.

use std::{
    sync::{Arc, Mutex},
};
use log::{warn, error, info};
use tokio::{
    task::JoinHandle,
    sync::{
        broadcast,
        broadcast::Sender,
        broadcast::Receiver,
    },
    time::{Instant, Duration, sleep},
};


async fn flush_after(dur: Duration, items_vec: Arc<Mutex<Vec<Message>>>, tx_message: Sender<Vec<Message>>) {
    sleep(dur).await;
    let mut vec_guard = items_vec.lock().unwrap();
    let vec_to_send = (*vec_guard).clone();
    vec_guard.clear(); // clears the vector
    drop(vec_guard);
    tx_message.send(vec_to_send).unwrap();
}

async fn killable_flush_after(dur: Duration, mut rx_kill: Receiver<()>, items_vec: Arc<Mutex<Vec<Message>>>, tx_message: Sender<Vec<Message>>) {
    tokio::select! {
        _ = flush_after(dur, items_vec, tx_message) => {}, // flush_after sleeps and tries to send the vector
        _ = rx_kill.recv() => {}, // if a message is received, the future rx_kill.recv() completes
    }
}

    
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    // some code
    // stream: StreamExt

    let (tx_message, rx_message) = broadcast::channel(100);
    let (tx_kill, rx_kill) = broadcast::channel(10);
    let max_duration = Duration::from_millis(100);
    let items_vec = Arc::new(Mutex::new(Vec::new()));
    while let Some(item) = stream.next().await {
        let _ = tx_kill.send(());
        tokio::spawn(killable_flush_after(max_duration, tx_kill.subscribe(), items_vec.clone(), tx_message.clone()));
        {
            let mut items_lock = items_vec.lock().unwrap();
            let message = deserialize_message(item)?; // TODO: catch error
            info!("SENDING - {:?}", message);
            (*items_lock).push(message); // TODO: catch error
        }
    }

    Ok(())
}

Thoughts?

If I'm understanding what you want correctly, I think you can just use tokio::time::timeout

Playground

use futures::{pin_mut, StreamExt};
use tokio::time::{sleep, Duration};

#[derive(Clone, Debug)]
struct Message(usize);

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let stream = futures::stream::unfold(0, |i| async move {
        if i < 100 {
            if i != 0 && i % 10 == 0 {
                sleep(Duration::from_millis(200)).await;
            }
            Some((Message(i), i + 1))
        } else {
            None
        }
    });
    pin_mut!(stream);

    let max_duration = Duration::from_millis(100);

    let mut items = Vec::new();
    let mut done = false;
    let mut timed_out = false;
    loop {
        // timeout will cancel the future when the timeout fires. futures::StreamExt::next is documented as being cancel safe here: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
        // Using a non-cancel safe future could result in losing values when the timeout fires at the wrong time.
        match tokio::time::timeout(max_duration, stream.next()).await {
            Ok(Some(message)) => items.push(message),
            Ok(None) => {
                done = true;
                timed_out = true
            }
            Err(_) => {
                timed_out = true;
            }
        }

        if timed_out {
            timed_out = false;
            if items.is_empty() {
                continue;
            }
            let burst = std::mem::take(&mut items);
            println!("{burst:?}");
        }

        if done {
            break;
        }
    }

    Ok(())
}

That is definitely a more ergonomic solution, thanks!

Depending on the details of how the system is expected to behave, you may also want to set a maximum interval messages are allowed to accumulate or a limit on the number of messages allowed to accumulate. Otherwise your code could stall out if the duration between bursts gets shorter than your hard coded timeout for some reason.