Split a Stream by elapsed time between messages

Hi, I have a ´Stream´ that is expected to return values in "bursts" (few messages, then a longer pause, then other messages).
I am trying (without success) to write an async task that listens to the stream, aggregates all messages together based on the elapsed time between messages and sends the result to a channel, all of this without blocking.
For example, task A listens to the stream and if ´next()´ is taking more than 1s then sends a "burst finished" message to task B. Task B listens to data between "burst finished" messages and aggregates them.

Sorry for not coming up with even a minimal code example but I am really struggling to wrap my head around this..

  • How would one do that?
  • Is there a better alternative architecture or I'm getting everything wrong from the basics?

Many thanks

It sounds like you want to implement a "debounce" or "throttle" which lets you buffer up the messages for a short while and calla function to send them down the channel when there is a gap.

I haven't had to write it in Rust before, but this is how you would implement a debounce in JavaScript. The equivalent should be doable in Rust, too.

function debounce(cb, delay = 250) {
  let timeout

  return (...args) => {
    clearTimeout(timeout)
    timeout = setTimeout(() => {
      cb(...args)
    }, delay)
  }
}

Throttling is slightly different in that you will choose to not call a callback unless there has been a gap of at least a certain duration.

function throttle(cb, delay = 250) {
  let shouldWait = false

  return (...args) => {
    if (shouldWait) return

    cb(...args)
    shouldWait = true
    setTimeout(() => {
      shouldWait = false
    }, delay)
  }
}
1 Like

After much effort I ended up writing something like this. It somewhat works but is very cumbersome and not idiomatic at all.

use std::{
    sync::{Arc, Mutex},
};
use log::{warn, error, info};
use tokio::{
    task::JoinHandle,
    sync::{
        broadcast,
        broadcast::Sender,
        broadcast::Receiver,
    },
    time::{Instant, Duration, sleep},
};


async fn flush_after(dur: Duration, items_vec: Arc<Mutex<Vec<Message>>>, tx_message: Sender<Vec<Message>>) {
    sleep(dur).await;
    let mut vec_guard = items_vec.lock().unwrap();
    let vec_to_send = (*vec_guard).clone();
    vec_guard.clear(); // clears the vector
    drop(vec_guard);
    tx_message.send(vec_to_send).unwrap();
}

async fn killable_flush_after(dur: Duration, mut rx_kill: Receiver<()>, items_vec: Arc<Mutex<Vec<Message>>>, tx_message: Sender<Vec<Message>>) {
    tokio::select! {
        _ = flush_after(dur, items_vec, tx_message) => {}, // flush_after sleeps and tries to send the vector
        _ = rx_kill.recv() => {}, // if a message is received, the future rx_kill.recv() completes
    }
}

    
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    // some code
    // stream: StreamExt

    let (tx_message, rx_message) = broadcast::channel(100);
    let (tx_kill, rx_kill) = broadcast::channel(10);
    let max_duration = Duration::from_millis(100);
    let items_vec = Arc::new(Mutex::new(Vec::new()));
    while let Some(item) = stream.next().await {
        let _ = tx_kill.send(());
        tokio::spawn(killable_flush_after(max_duration, tx_kill.subscribe(), items_vec.clone(), tx_message.clone()));
        {
            let mut items_lock = items_vec.lock().unwrap();
            let message = deserialize_message(item)?; // TODO: catch error
            info!("SENDING - {:?}", message);
            (*items_lock).push(message); // TODO: catch error
        }
    }

    Ok(())
}

Thoughts?

If I'm understanding what you want correctly, I think you can just use tokio::time::timeout

Playground

use futures::{pin_mut, StreamExt};
use tokio::time::{sleep, Duration};

#[derive(Clone, Debug)]
struct Message(usize);

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let stream = futures::stream::unfold(0, |i| async move {
        if i < 100 {
            if i != 0 && i % 10 == 0 {
                sleep(Duration::from_millis(200)).await;
            }
            Some((Message(i), i + 1))
        } else {
            None
        }
    });
    pin_mut!(stream);

    let max_duration = Duration::from_millis(100);

    let mut items = Vec::new();
    let mut done = false;
    let mut timed_out = false;
    loop {
        // timeout will cancel the future when the timeout fires. futures::StreamExt::next is documented as being cancel safe here: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
        // Using a non-cancel safe future could result in losing values when the timeout fires at the wrong time.
        match tokio::time::timeout(max_duration, stream.next()).await {
            Ok(Some(message)) => items.push(message),
            Ok(None) => {
                done = true;
                timed_out = true
            }
            Err(_) => {
                timed_out = true;
            }
        }

        if timed_out {
            timed_out = false;
            if items.is_empty() {
                continue;
            }
            let burst = std::mem::take(&mut items);
            println!("{burst:?}");
        }

        if done {
            break;
        }
    }

    Ok(())
}

That is definitely a more ergonomic solution, thanks!

Depending on the details of how the system is expected to behave, you may also want to set a maximum interval messages are allowed to accumulate or a limit on the number of messages allowed to accumulate. Otherwise your code could stall out if the duration between bursts gets shorter than your hard coded timeout for some reason.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.