Accumulating and handling values at a fixed time period with async rust

I am trying to accumulate values received from tokio::sync::mpsc::Receiver and then process them at fixed intervals and have run into the problem where my timeout never happens whilst the Receiver has values being delivered. Here is a small complete code snippet that reproduces the problem:

use tokio::{
    sync::mpsc,
    time::sleep,
};
use std::time::Duration;

#[tokio::main]

async fn main() {

    let (tx, mut rx) = mpsc::channel(8);

    tokio::spawn(async move {
        let mut value = 0;

        loop {
            // Generate values
            tx.send(value).await;
            value += 1;
            sleep(Duration::from_millis(10)).await;
        }
    });

    let mut values = Vec::new();
    loop {
        let sleep = sleep(Duration::from_millis(100));
        tokio::pin!(sleep);

        tokio::select! {

            _ = &mut sleep => {
                // This is intended to fire every 100mS, but does not whilst the above loop is providing values.
                eprintln!("Accumulated Values : {:#?}", values);
                values.clear();
            },

            value_event = rx.recv() => {
                match value_event {
                    Some(value) => {
                        eprintln!("Received value {}", value);
                        values.push(value);
                    },
                    None => {}
                }
            }
        }
    }
}

In my actual code the values are received from a web front end, delivered via websocket, and forwarded to a task via mpsc where I am trying to collect them in a similar way to above before sending them in batches. All the while there are messages coming from the browser the timeout which is intended to collect a bunch of values from the accumulating Vec never happens. As soon as the browser stops sending values it fires and all the messages delivered get handled in one go. The timeout continues to work until there are more values being received at which point it stop again. It seems as if the select macro is prioritising the mpsc events over the time::sleep ones.

Can anybody explain why this is happening and even better suggest a fix?

Many thanks

Dan

None of the usual blocking issues here, so my best guess is the eprintln is somehow not being flushed... are you testing this with a simple main.rs and cargo run?

select! awaits until one of the futures is ready, executes it, then returns execution to your loop. Assume value_event happened. The code reaches the end of the loop, dropping sleep. At the beginning of the next iteration through the loop, it creates a new sleep. If the next value arrives before 100ms, this whole process repeats.

    let mut values = Vec::new();
    let mut interval = interval(Duration::from_millis(100));
    loop {
        tokio::select! {
            _ = interval.tick() => {
                eprintln!("Accumulated Values : {:#?}", values);
                values.clear();
            },

            value_event = rx.recv() => {
                match value_event {
                    Some(value) => {
                        eprintln!("Received value {}", value);
                        values.push(value);
                    },
                    None => {}
                }
            }
        }
    }
4 Likes

Doh! Seems so obvious now you point it out. I'll give this a go tonight. Many thanks.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.