Infinite "loop" or "while let" are the same in terms of performances?

REPL (all the code): Rust Playground

In the below code I'm receiving from a channel and based on some logic I return Option<Message>.

impl Listener {
    pub async fn receive(&mut self) -> Option<Arc<Message>> {
        if let Ok(message) = self.receiver.recv().await {
            if message.team_id == self.team_id {
                println!("this is the issue, I'm returning None and it closes the stream! block");
                return None;
            }

            Some(message)
        } else {
            None
        }
    }
}

When I receive that message in the axum handler I'm using an infinite loop like this:

 let res = async_stream::stream! {
    loop {
        if let Some(msg) = receiver.receive().await {
            yield Ok(sse::Event::default().data(msg));
        }
    }
};

but I'm afraid that the infinite loop can be dangerout for performances.

First question

Is it ok to use infinite loop like this? Is it always trying to get data?

I'm scaring because I'll have tons of this function alive concurrently.

Second question

Is the while let version the same as the infinite loop?

I tried to use the below code but it closes the stream if the message is None. Why?

Should I use Result<> instead? And how?

let s = async_stream::stream! {
    while let Some(msg) = receiver.receive().await {
            yield Ok(sse::Event::default().data(msg));
    }
};

Usually, streams only return None when they reach the end, which means that they will return None forever if you keep calling it. You should probably follow that pattern by not returning None in Listener::receive to skip a message. Instead use a loop to just wait for the next message.

As another thing, if let Ok is a major red flag. It usually means that you aren't properly handling errors.

3 Likes

Thanks for this answer.

You should probably follow that pattern by not returning None in Listener::receive to skip a message. Instead use a loop to just wait for the next message.

This is my doubt: how to?

Do you mean this?

let s = async_stream::stream! {
    loop {
        receiver.receive().await {
            yield Ok(Event::default().data(msg));
        }
    }
};

The thing is I have a logic inside receive() and I don't know how to write it differently:

I can change this:

impl Listener {
    pub async fn receive(&mut self) -> Option<Arc<Message>> {
        if let Ok(message) = self.receiver.recv().await {
            if message.team_id == self.team_id {
                return None;
            }

            Some(message)
        } else {
            None
        }
    }
}

to this:

impl Listener {
    pub async fn receive(&mut self) -> Arc<Message> {
        match self.receiver.recv().await {
            Ok(message) => {
                if message.team_id == self.team_id {
                    // ---> BUT WHAT TO RETURN HERE NOW?
                    return;
                }

                message
            }
            Err(err) => {
                // handle this err
            }
        }
    }
}

And for:

As another thing, if let Ok is a major red flag. It usually means that you aren't properly handling errors.

Do you mean I should use a match instead?

I was thinking something along these lines:

impl Listener {
    pub async fn receive(&mut self) -> Option<Arc<Message>> {
        while let Ok(message) = self.receiver.recv().await {
            if message.team_id == self.team_id {
                continue;
            }

            return Some(message);
        }
        None
    }
}

Thanks. It works but this doesn't solve my doubts:

  1. I do not handle the Err part

  2. It doesn't solve the axum handler skipping part

The self.receiver.recv().await call seems to return a Result. What is this method? Did you write it yourself?

Not sure what you mean with axum.

Yep. The code is here: Rust Playground.

Ohhhhhh, it's a broadcast channel. Yeah, you definitely can't do if let Ok on a broadcast channel without understand what kind of errors it can have.

Here is the error type of broadcast::Receiver::recv:

pub enum RecvError {
    Closed,
    Lagged(u64),
}

link to documentation

The first error happens when the channel is closed. This means that there are no more senders for the channel. The second error happens when the receiver can't keep up, and some messages was deleted because of that. You should probably handle the errors in different ways.

For example, you might do this:

impl Listener {
    pub async fn receive(&mut self) -> Option<Arc<Message>> {
        loop {
            match self.receiver.recv().await {
                Ok(msg) if msg.team_id == self.team_id => continue,
                Ok(msg) => return Some(msg),
                Err(RecvError::Lagged(num_skipped)) => {
                    // How would you like to handle this error case?
                    // Here, I just print and keep going.
                    println!("Lost {num_skipped} messages.");
                },
                Err(RecvError::Closed) => return None,
            }
        }
    }
}
1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.