Stopping a filtered Iterator

#1

I read messages from a socket using an iterator. those messages will be parsed and the result shall be passed as another iterator to a consumer. There are four possible reactions for each message:

  1. IO read error -> abort with error
  2. the message is of wrong type -> skip
  3. the message is of type close -> stop iterating
  4. the message is valid -> return message

My Iterator returns a Result<> to handle 1. and 4. but it would be nice if the iterator behaves like a drained one after the first error.

I solved 2. by using filter_map().

I still need a solution for 3. I guess it could be done with a take_while() but I don’t know how to write that without introducing helper enums.

    let map_filter = |wsr: WebSocketResult<OwnedMessage>| -> Option<Result<MessageData, Error>> {
        match wsr {
            Ok(raw_ws_message) => {
                match raw_ws_message {
                    OwnedMessage::Binary(data) => { // this is fine
                        Some(Ok(MessageData::new(data)))
                    }
                    OwnedMessage::Text(text) => { // this shall be skipped
                        None
                    }
                    OwnedMessage::Close(_) => { // this shall end the iteration gracefully
                        None
                    }
                }
            },
            Err(err) => { // this shall return an error (and it would optimally end the iteration afterwards)
                Some(Err(Error::from(err)))
            },
        }
    };

    let mut message_data_stream = raw_ws_message_stream.filter_map(map_filter);

    for message_data in message_data_stream {
        worker.handle_message(message_data?);
    }

My solution already feels cumbersome and would become even worse without help.

0 Likes

#2

Ok, I have some kind of a solution by inserting another Option-layer to control a take_while:

    let map_filter = |wsr: WebSocketResult<OwnedMessage>| -> Option<Option<Result<MessageData, Error>>> {
        match wsr {
            Ok(raw_ws_message) => {
                match raw_ws_message {
                    OwnedMessage::Binary(data) => {
                        Some(Some(Ok(message)))
                    }
                    OwnedMessage::Text(text) => {
                        None
                    }
                    OwnedMessage::Close(_) => {
                        Some(None)
                    }
                }
            },
            Err(err) => {
                Some(Some(Err(Error::from(err))))
            },
        }
    };

    let mut message_data_stream = raw_ws_message_stream.filter_map(map_filter).take_while(|m| m.is_some()).map(|m| m.unwrap());

Can this take_while(|m| m.is_some()).map(|m| m.unwrap()) be optimized?

0 Likes

#3

Itertools::while_some.

Edit: more generally, don’t forget that you can always just define your own iterator adapters.

1 Like

#4

Thank you - I totally forgot about itertools. I also found fold_results there which seems to solve my stop-after-first Err problem.

Edit: No, it doesn’t - I believe the complexity demands writing an own adapter as suggested.

0 Likes