I am trying to accumulate values received from tokio::sync::mpsc::Receiver and then process them at fixed intervals and have run into the problem where my timeout never happens whilst the Receiver has values being delivered. Here is a small complete code snippet that reproduces the problem:
use tokio::{
sync::mpsc,
time::sleep,
};
use std::time::Duration;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(8);
tokio::spawn(async move {
let mut value = 0;
loop {
// Generate values
tx.send(value).await;
value += 1;
sleep(Duration::from_millis(10)).await;
}
});
let mut values = Vec::new();
loop {
let sleep = sleep(Duration::from_millis(100));
tokio::pin!(sleep);
tokio::select! {
_ = &mut sleep => {
// This is intended to fire every 100mS, but does not whilst the above loop is providing values.
eprintln!("Accumulated Values : {:#?}", values);
values.clear();
},
value_event = rx.recv() => {
match value_event {
Some(value) => {
eprintln!("Received value {}", value);
values.push(value);
},
None => {}
}
}
}
}
}
In my actual code the values are received from a web front end, delivered via websocket, and forwarded to a task via mpsc where I am trying to collect them in a similar way to above before sending them in batches. All the while there are messages coming from the browser the timeout which is intended to collect a bunch of values from the accumulating Vec never happens. As soon as the browser stops sending values it fires and all the messages delivered get handled in one go. The timeout continues to work until there are more values being received at which point it stop again. It seems as if the select macro is prioritising the mpsc events over the time::sleep ones.
Can anybody explain why this is happening and even better suggest a fix?
Many thanks
Dan