Usually, streams only return None when they reach the end, which means that they will return None forever if you keep calling it. You should probably follow that pattern by not returning None in Listener::receive to skip a message. Instead use a loop to just wait for the next message.
As another thing, if let Ok is a major red flag. It usually means that you aren't properly handling errors.
You should probably follow that pattern by not returning None in Listener::receive to skip a message. Instead use a loop to just wait for the next message.
This is my doubt: how to?
Do you mean this?
let s = async_stream::stream! {
loop {
receiver.receive().await {
yield Ok(Event::default().data(msg));
}
}
};
The thing is I have a logic inside receive() and I don't know how to write it differently:
The first error happens when the channel is closed. This means that there are no more senders for the channel. The second error happens when the receiver can't keep up, and some messages was deleted because of that. You should probably handle the errors in different ways.
For example, you might do this:
impl Listener {
pub async fn receive(&mut self) -> Option<Arc<Message>> {
loop {
match self.receiver.recv().await {
Ok(msg) if msg.team_id == self.team_id => continue,
Ok(msg) => return Some(msg),
Err(RecvError::Lagged(num_skipped)) => {
// How would you like to handle this error case?
// Here, I just print and keep going.
println!("Lost {num_skipped} messages.");
},
Err(RecvError::Closed) => return None,
}
}
}
}