Reducing latency in processing messages from several channels / streams in async context

I'm trying to receive updates from various channels or streams in an async context, as well as run certain tasks at various intervals by implementing the use of ticks.

I am currently doing this using the tokio::select! macro in order to switch between the various incoming messages and tasks, however I seem to be getting some latency or lag at times which is really affecting how quickly I can react to the incoming messages.

Is there any advice on a better way to approach this in order to reduce the latency in processing incoming messages from various streams / channels?

Some sample code is posted below, and you can run it on playground here: Rust Playground

let mut recv = Vec::new();
    let mut futures = stream::FuturesUnordered::new();
    for i in 0..3 {
        let (tx, rx) = mpsc::unbounded();
        recv.push(rx);
        futures.push(tokio::spawn(async move {
            // tokio::time::sleep(std::time::Duration::from_millis(1000 * (i + 1))).await;
            for j in 0..1000 {
                tokio::time::sleep(std::time::Duration::from_millis(3)).await;
                let message = format!("Message {} from sender {}", j, i);
                tx.unbounded_send(message).unwrap();
            }
        }));
    }

    let mut tick = interval(Duration::from_millis(5));
    let mut other_tick = interval(Duration::from_millis(25));
    let mut final_tick = interval(Duration::from_millis(250));
    let mut select_all = select_all(recv);

    loop {
        let start = Instant::now();
        tokio::select! {
            msg = select_all.next() => {
                println!("{:#?}", msg);
            },
            _ = tick.tick() => {
                println!("TICK 1");
            },
            _ = other_tick.tick() => {
                println!("TICK 2");
            },
            _ = final_tick.tick() => {
                println!("TICK 3");
            },
            _ = futures.select_next_some() => {
                eprintln!("Thread died");
            }
            else => break (),
        }
        let elapsed = start.elapsed();
        if elapsed.as_millis() > 5 {
            println!("latency {:?}", elapsed);
        }
    }

as always when it comes to performance, your first action (assuming you know the current performance is subpar, if not, you should benchmark to see if it is) should be profiling. personal i use perf for this, although it isn't super useful for io-bound operations, since it mainly measures cpu time.

additional, you might want to check out the biased mode for select!.

Just a small thought: If you're running this test on a machine with other things running, they may be interfering with the test. In particular a thread that is sleeping may need to be woken by the timer, but is busy doing something else.

It's just a reproducible piece of code of what I see in my production case, but I often see large latencies between iterations of select's, so I'm trying to find ways to minimize that. In prod this is run on a standalone server and still has the same results.

I assume when you run it you're seeing output with latency X, so latency over 5 ms? When I run it on my laptop I do not see these in the output. When run in the playground, I do see these in the output, but this isn't surprising since a shared server is being used.

Yeah it's a reproducible example of what I see in production, which is much more complex. Each closure within the select! iteration of a loop never takes more than 1 ms say, but if I measure the time taken for one iteration of the loop it can sometimes be more than 100ms. So I'm trying to understand how this can happen, when the incoming stream of messages should be continuous effectively.

Ok, I believe you that you're seeing longer latencies in production. But the code you posted does not reproduce the problem, for me anyway.


In case versions matter, I'm running with the latest:

futures = "0.3.30"
tokio = { version = "1.38", features = ["full"] }

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.