I'm trying to receive updates from various channels or streams in an async context, as well as run certain tasks at various intervals by implementing the use of ticks.
I am currently doing this using the tokio::select! macro in order to switch between the various incoming messages and tasks, however I seem to be getting some latency or lag at times which is really affecting how quickly I can react to the incoming messages.
Is there any advice on a better way to approach this in order to reduce the latency in processing incoming messages from various streams / channels?
Some sample code is posted below, and you can run it on playground here: Rust Playground
let mut recv = Vec::new();
let mut futures = stream::FuturesUnordered::new();
for i in 0..3 {
let (tx, rx) = mpsc::unbounded();
recv.push(rx);
futures.push(tokio::spawn(async move {
// tokio::time::sleep(std::time::Duration::from_millis(1000 * (i + 1))).await;
for j in 0..1000 {
tokio::time::sleep(std::time::Duration::from_millis(3)).await;
let message = format!("Message {} from sender {}", j, i);
tx.unbounded_send(message).unwrap();
}
}));
}
let mut tick = interval(Duration::from_millis(5));
let mut other_tick = interval(Duration::from_millis(25));
let mut final_tick = interval(Duration::from_millis(250));
let mut select_all = select_all(recv);
loop {
let start = Instant::now();
tokio::select! {
msg = select_all.next() => {
println!("{:#?}", msg);
},
_ = tick.tick() => {
println!("TICK 1");
},
_ = other_tick.tick() => {
println!("TICK 2");
},
_ = final_tick.tick() => {
println!("TICK 3");
},
_ = futures.select_next_some() => {
eprintln!("Thread died");
}
else => break (),
}
let elapsed = start.elapsed();
if elapsed.as_millis() > 5 {
println!("latency {:?}", elapsed);
}
}