Hi, I have a ´Stream´ that is expected to return values in "bursts" (few messages, then a longer pause, then other messages).
I am trying (without success) to write an async task that listens to the stream, aggregates all messages together based on the elapsed time between messages and sends the result to a channel, all of this without blocking.
For example, task A listens to the stream and if ´next()´ is taking more than 1s then sends a "burst finished" message to task B. Task B listens to data between "burst finished" messages and aggregates them.
Sorry for not coming up with even a minimal code example but I am really struggling to wrap my head around this..
- How would one do that?
- Is there a better alternative architecture or I'm getting everything wrong from the basics?
Many thanks
It sounds like you want to implement a "debounce" or "throttle" which lets you buffer up the messages for a short while and calla function to send them down the channel when there is a gap.
I haven't had to write it in Rust before, but this is how you would implement a debounce in JavaScript. The equivalent should be doable in Rust, too.
function debounce(cb, delay = 250) {
let timeout
return (...args) => {
clearTimeout(timeout)
timeout = setTimeout(() => {
cb(...args)
}, delay)
}
}
Throttling is slightly different in that you will choose to not call a callback unless there has been a gap of at least a certain duration.
function throttle(cb, delay = 250) {
let shouldWait = false
return (...args) => {
if (shouldWait) return
cb(...args)
shouldWait = true
setTimeout(() => {
shouldWait = false
}, delay)
}
}
1 Like
After much effort I ended up writing something like this. It somewhat works but is very cumbersome and not idiomatic at all.
use std::{
sync::{Arc, Mutex},
};
use log::{warn, error, info};
use tokio::{
task::JoinHandle,
sync::{
broadcast,
broadcast::Sender,
broadcast::Receiver,
},
time::{Instant, Duration, sleep},
};
async fn flush_after(dur: Duration, items_vec: Arc<Mutex<Vec<Message>>>, tx_message: Sender<Vec<Message>>) {
sleep(dur).await;
let mut vec_guard = items_vec.lock().unwrap();
let vec_to_send = (*vec_guard).clone();
vec_guard.clear(); // clears the vector
drop(vec_guard);
tx_message.send(vec_to_send).unwrap();
}
async fn killable_flush_after(dur: Duration, mut rx_kill: Receiver<()>, items_vec: Arc<Mutex<Vec<Message>>>, tx_message: Sender<Vec<Message>>) {
tokio::select! {
_ = flush_after(dur, items_vec, tx_message) => {}, // flush_after sleeps and tries to send the vector
_ = rx_kill.recv() => {}, // if a message is received, the future rx_kill.recv() completes
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// some code
// stream: StreamExt
let (tx_message, rx_message) = broadcast::channel(100);
let (tx_kill, rx_kill) = broadcast::channel(10);
let max_duration = Duration::from_millis(100);
let items_vec = Arc::new(Mutex::new(Vec::new()));
while let Some(item) = stream.next().await {
let _ = tx_kill.send(());
tokio::spawn(killable_flush_after(max_duration, tx_kill.subscribe(), items_vec.clone(), tx_message.clone()));
{
let mut items_lock = items_vec.lock().unwrap();
let message = deserialize_message(item)?; // TODO: catch error
info!("SENDING - {:?}", message);
(*items_lock).push(message); // TODO: catch error
}
}
Ok(())
}
Thoughts?
If I'm understanding what you want correctly, I think you can just use tokio::time::timeout
Playground
use futures::{pin_mut, StreamExt};
use tokio::time::{sleep, Duration};
#[derive(Clone, Debug)]
struct Message(usize);
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream = futures::stream::unfold(0, |i| async move {
if i < 100 {
if i != 0 && i % 10 == 0 {
sleep(Duration::from_millis(200)).await;
}
Some((Message(i), i + 1))
} else {
None
}
});
pin_mut!(stream);
let max_duration = Duration::from_millis(100);
let mut items = Vec::new();
let mut done = false;
let mut timed_out = false;
loop {
// timeout will cancel the future when the timeout fires. futures::StreamExt::next is documented as being cancel safe here: https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
// Using a non-cancel safe future could result in losing values when the timeout fires at the wrong time.
match tokio::time::timeout(max_duration, stream.next()).await {
Ok(Some(message)) => items.push(message),
Ok(None) => {
done = true;
timed_out = true
}
Err(_) => {
timed_out = true;
}
}
if timed_out {
timed_out = false;
if items.is_empty() {
continue;
}
let burst = std::mem::take(&mut items);
println!("{burst:?}");
}
if done {
break;
}
}
Ok(())
}
That is definitely a more ergonomic solution, thanks!
Depending on the details of how the system is expected to behave, you may also want to set a maximum interval messages are allowed to accumulate or a limit on the number of messages allowed to accumulate. Otherwise your code could stall out if the duration between bursts gets shorter than your hard coded timeout for some reason.