Tokio select with look-ahead

I have a tokio task that listens to two channels channel A and channel B. Both channels carry the same type of objects. These objects contain a timestamp. I would like to have my task listen to both channels and if one of them has an object, then it should take it and process it. If both of them have an object I want to take the one with the smallest timestamp and if the timestamps are the same, I want to take the object of channel A.

How can I do that in a non-messy way? I started writing it but it became an unreadable mess and I bet there is a clean native way to do that.

I assume you want to buffer the other object for later retrieval instead of dropping it, right?

you can create a wrapper with an internal buffer that merges two channels, and implement poll() for it, like a custom Future. tokio's channel exposes a poll_recv() API, which makes it very easy to implement your own polling function. then you can just use poll_fn() to wrap it in an async function.

here's a demo: Rust Playground

This doesn't actually require any selection.

let mut next_a = None;
let mut next_b = None;
while let Some(next_msg) = {
    if next_a.is_none() {
        next_a = chan_a.recv().await;
    }
    if next_b.is_none() {
        next_b = chan_b.recv().await;
    }
    match (&mut next_a, &mut next_b) {
        (Some(a), Some(b)) if a.tz <= b.tz => next_a.take(),
        (Some(a), None) => next_a.take(),
        _ => next_b.take(),
    }
} {
    // handle next message here
}
2 Likes

Could you clarify a few behavioral details?

  1. Immediate processing: When only one channel has data available right now, do you want to process it immediately without waiting for the other channel? Or is it okay to wait/block until you can peek at both channels?

  2. Buffering: When both channels have data and you take the one with the smaller timestamp, what should happen to the other message? Should it be:

    • Buffered/saved for the next iteration?
    • Dropped/ignored?
    • Or does this never happen in your use case (messages always arrive one at a time)?
  3. Channel closure: What should happen when one channel closes but the other is still active? Should you:

    • Continue processing messages from the remaining open channel?
    • Stop processing entirely?
  4. Starvation tolerance: If channel A is very busy with many messages with early timestamps, is it acceptable for channel B's messages to wait, or do you need some fairness guarantee?

These details will help determine whether you need true non-blocking look-ahead with tokio::select!, or if a buffering approach like Alice suggested would work for your use case.

2 Likes

Hey, thanks for the questions.

  1. Yes, if only one message is present then it should be processed immediately.
  2. It should be buffered and in the next iteration if it's the only message then it should be processed immediately, if there is a message in the other channel, then the usual rules apply, i.e. smallest timestamp and if the same, channel A.
  3. If there is at least one open channel, then the processing should continue. Though in my case this will probably not happen.
  4. The channels are bounded and if channel A is flooded with messages then it's ok to drop the oldest message in channel B if the channel gets full.

Hi, thanks for replying.

I'm a bit confused with what's happening here. So if we enter the loop and there is no message available in either channel, then we stop at next_a = chan_a.recv().await and we wait there until a message appears in channel A and meanwhile we never check what's happening in channel B, am I right?

Hi, thanks for replying. There is an issue with the solution you suggested. So if both channels have a message then the priority is indeed correct. Right after that the other message is processed, however there may be another message with a smaller timestamp in the other channel.

Thanks for the clarification! Based on your requirements, Alice's solution unfortunately doesn't quite work because it will block waiting for channel A even when channel B has data ready.

You need a non-blocking look-ahead approach. Here's a solution using tokio::select! with biased mode:

let mut buffer_a = None;
let mut buffer_b = None;

loop {
    // Try to process buffered messages first with priority rules
    if let (Some(a), Some(b)) = (&buffer_a, &buffer_b) {
        let msg = if a.timestamp <= b.timestamp {
            buffer_a.take().unwrap()
        } else {
            buffer_b.take().unwrap()
        };
        process(msg).await;
        continue;
    }

    // If only one buffer has data, process it immediately
    if let Some(msg) = buffer_a.take().or_else(|| buffer_b.take()) {
        process(msg).await;
        continue;
    }

    // Both buffers empty - wait for next message from either channel
    tokio::select! {
        biased;  // Ensures consistent polling order

        result = chan_a.recv(), if buffer_a.is_none() => {
            match result {
                Some(msg) => buffer_a = Some(msg),
                None => {
                    // Channel A closed, continue with just B
                    if chan_b.is_closed() { break; }
                }
            }
        }

        result = chan_b.recv(), if buffer_b.is_none() => {
            match result {
                Some(msg) => buffer_b = Some(msg),
                None => {
                    // Channel B closed, continue with just A
                    if chan_a.is_closed() { break; }
                }
            }
        }
    }
}

Key points:

  • The if guards on select! branches prevent polling channels when buffers are full
  • Buffers are checked first, so available messages are processed immediately
  • Continues processing from remaining channel when one closes
  • biased mode ensures deterministic polling order (A before B)

Does this match what you're looking for?

Hm, I think not.

If at the beginning of the loop, at least one of the two buffers is none empty, then we will hit continue and we will never reach select!, right? In this case, when we eventually reach select! both the if guards will be true.

The problem I see is that if both buffers are full, then the message with the smallest timestamp will be processed and then in the next iteration the other buffer will be processed. Which means that the channel will not be checked to see if there is another message with smaller timestamp.

You're right. I think this addresses that:

let mut buffer_a = None;
let mut buffer_b = None;

loop {
    // Refill empty buffers non-blockingly
    if buffer_a.is_none() {
        buffer_a = chan_a.try_recv().ok();
    }
    if buffer_b.is_none() {
        buffer_b = chan_b.try_recv().ok();
    }
    
    // If both empty, wait for at least one message
    if buffer_a.is_none() && buffer_b.is_none() {
        tokio::select! {
            biased;
            result = chan_a.recv() => buffer_a = result,
            result = chan_b.recv() => buffer_b = result,
        }
        // After select, continue to recheck with try_recv
        continue;
    }
    
    // Process with priority rules
    let msg = match (&buffer_a, &buffer_b) {
        (Some(a), Some(b)) if a.timestamp <= b.timestamp => buffer_a.take().unwrap(),
        (Some(_), Some(_)) => buffer_b.take().unwrap(),
        (Some(_), None) => buffer_a.take().unwrap(),
        (None, Some(_)) => buffer_b.take().unwrap(),
        (None, None) => break,  // Both channels closed
    };
    
    process(msg).await;
}
2 Likes

Yes! Thank you! That works and it is elegant and clean

The code I shared is for the opposite case. It always waits until both A and B have a message available, then takes the message with the smaller timestamp.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.