Pros/Cons of multiple channels + tokio::select vs single channel and expanding message enum variants

I was wondering what the practical implications are for 2 different approaches in message passing through channels. If I have an actor that receives messages from multiple sources I can either use (1) tokio::select!() matching macro across multiple channels or (2) have a single message enum type that covers all the potential underlying messages and use a mpsc channel and clone the sender, and then match and delegate based on the match downstream of a single recv().


pub async fn run_reader(
    mut reader: Reader<Connected>,
    mut receiver: mpsc::Receiver<RegisteredReq>,
) {
    loop {
        tokio::select! {
            Ok(msg) = reader.read_msg() => {
                reader.handle_msg(msg).await;
            },
            Some(msg) = receiver.recv() => {
                reader.handle_registration(msg);
            },

            else => break,
        }
    }
}

or

pub async fn run_reader_2(mut reader: Reader, mut recv: Receiver<ExpandedEnum>) {
    while let Some(msg) = recv.recv().await {
        reader.interpret_msg(msg)
    }
}
1 Like

One difference between the two strategies is that in the first case you can detect the channel being closed because the sender was dropped, whereas if you combine them into one channel, you only get a channel error when all senders are dropped. That might matter if you want to exit after only one failure/end-of-stream rather than after all.

On the other hand, having a single message enum means that it's much easier to forward, if you have a reason to do that.

Thanks, those are good points. I was also curious if there would be a performance advantage between the two. If I had to guess, tokio::select! may be able to spread the work out more across different threads in it's thread pool, but depending on the nature of the downstream work that could be good or it could come with some extra overhead.

No, select! just polls each future sequentially (though in random order for fairness). But, if your message channels are so expensive they need to be checked using additional threads, something has gone seriously wrong in the architecture or implementation.

Speaking of which — reader.read_msg() doesn't sound like a channel receiver. Are you sure what you're calling from select! is cancellation safe? When select! picks a branch that is ready, all the other futures get cancelled, which can leave work inappropriately half-finished — for example, parsing a message out of an input stream.

If that applies to your situation, then that's a good reason to adopt the second option, so that the message parser can run in its own task that doesn't get cancelled and restarted.

1 Like

That's a good question re: cancellation safe, I think it should be fine given that its in a loop, the other channel will just queue up and be ready on the next iteration. Unless I am misunderstanding some concept there. The implementation does seem to be working as is, so that is a good sign. Is the cancellation and restarting expensive at all, my guess is not?

Re the threads comment, that could just be me being misinformed. The channels themselves shouldn't be expensive at all but if each branch had its own thread then the downstream processing of the message itself could be happening while the other branch is being read on the next loop iteration. I don't know the implementation of select! though, now that I think about it it seems unlikely that it would be doing that anyways.

That's a good question re: cancellation safe, I think it should be fine given that its in a loop, the other channel will just queue up and be ready on the next iteration. Unless I am misunderstanding some concept there.

The issue is not with your loop but with whether read_msg() will leave some work half-completed when it's cancelled. I don't know what read_msg() does so this is just a guess that it might be a problem, since it sounds like it might be not just a channel.

Is the cancellation and restarting expensive at all, my guess is not?

Entirely depends on how expensive the expressions inside each arm are. But that and cancellation are both reasons you might want to make sure any substantial work is done independently rather than inside select!'s expressions.

Well I am glad I asked this question now! I did not realize the other future would be dropped entirely. I will refactor this to only pull the message out in the select macro and then pattern match on the message type outside of the select branches.

You were right to point out that read_msg is a different animal. That is basically pulling a parsed frame (and then delegating based on the message type) out of a TCP stream. The Reader struct owns a OwnedReadHalf part of a TCPStream.

    async fn read_msg(&mut self) -> Result<TCPMessage> {
        TCPMessage::read(&mut self.reader, &mut self.read_buffer).await
    }

My select call instead can just return the msg and then there can be a subsequent match after the select can handle the processing branches which are dependent on msg type. This way only one thing is happening in each branch of the select call.

To ensure no pending futures are cancelled I need to make sure each branch contains only a single await, is this correct?

To ensure no pending futures are cancelled I need to make sure each branch contains only a single await , is this correct?

I'm not sure if you understand correctly or not. To clear things up, let me establish some terms. The select! documentation doesn't make this clearest, so I'll make up some:

select! {
    pat1 = candidate1 => handler1,
    pat2 = candidate2 => handler2,
}

I'm not saying that the handlers may be cancelled; I'm saying that the candidates may be cancelled (in particular, all but the one that is picked are cancelled). Once select! has picked one of its arms and started executing the handler, everything is like normal async block code; the problem is that the candidates all have to be run to find out which one succeeds.

So, your read_msg must not be one of the candidate expressions, because if it is cancelled part way through, it would have possibly read part of a frame out of the TCP stream, meaning the data was discarded and the next attempt to read will get partial data.

It's not sufficient to say "there's only one await" because TCPMessage::read might contain within itself multiple awaits or equivalent, and it might have had a relevant side effect before the first await. It might be that the reader is designed to be cancellation safe (I don't know if it's your code or a library), but you can't tell by just looking at the call to it.

The way you can make this safe is by running the stream reading on a separate task, which turns the frames into messages on a channel.

Disclaimer: I haven't done serious, involved async programming, and I might have missed a detail or strategy here. I just have studied the Rust async model and its hazards.

2 Likes

Got it! Thanks for the explanation around handler vs candidate, that totally makes sense.

Understood re: read_msg not being a good "candidate" for a candidate expression. I did mean a single await across the entire call stack not just what is visible. This is my own code but I think your point stands that this is not going to work given that a frame may end up being multiple reads, due to size of the frame.

Great point about having all the reading done in a separate task, I will take this recommendation!

Thanks again!
Dave

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.