Propagate error while iterating over lines

I'm trying to write a function that iterates through the lines of text received from a TcpStream, returning an io::Result whose Ok value is a String holding the concatenation of these lines up to but not including the first empty line encountered.

For example, if the TcpStream returned:
Line1\n
Line2\n
Line3\n
\n
Line4\n
Line5\n

Assuming no error occurred reading from the stream, this function would return:

Ok("Line1\nLine2\nLine3".to_string())

If an error occurred reading from the stream, this function would return the IO error.

I'm trying to implement this function by stringing together functions on the Iterator trait, instead of using a 'for' loop. Here's what I have so far, which is much because I don't know how to deal with the possibility of an error in reading from the stream.

fn get_first_non_empty_lines(server: &str, port: u16) -> io::Result<String> {
  let mut stream = TcpStream::connect((server, port))?;
  let reader = BufReader::new(stream);
  let data = reader.lines()
    // What iterator functions to call here so that any error encountered
    // stops the iteration and is returned?
}

I tried using Iterator::take_while, looking for non-empty lines, but the closure I pass in needs to return a bool, so I don't know how to deal with an error in reading from the stream. The same goes for fold -- I don't know how to deal with a possible error.

Thanks in advance for any help.

You're trying to produce 1 value from many iterator items, so you're looking for something like fold/reduce/for_each, but you want them to be early-exitable, so thus look for something with &mut (because if they're early-exitable you don't want it consuming the iterator, as then you couldn't look again), so you'll end up at

I would probably use BufRead::read_line in a loop in this case. Some care would be needed around what exactly you want your newline semantics to be (e.g. if your input has \r\n newlines like a standard HTTP header, and you want to throw out the \rs).

In your case probably a simple for loop with ? operator could work:

let mut out = String::new();
for line in reader.lines() {
   let line = line?; // handle error
   out.push_str(&line);
}
Ok(out)

and .try_fold is fine too.

For iterators, .collect() knows how to aggregate results. If you can collect Foo<Result<T, E>>, then you can also collect Result<Foo<T>, E> which will stop on the first error (where Foo can be a Vec, HashMap, etc.)

2 Likes

kornel's example translates pretty well to try_for_each, as an example:

pub fn demo(reader: impl BufRead) -> io::Result<String> {
    let mut out = String::new();
    reader.lines().try_for_each(|line| -> io::Result<_> {
        let line = line?; // handle error
        out.push_str(&line);
        Ok(())
    })?;
    Ok(out)
}

https://play.rust-lang.org/?version=beta&mode=debug&edition=2021&gist=9b522e41a9ed39585bd35bd896364990

But might as well use for instead of the iterator approaches unless you have some other thing making the iterator version a better fit.

Note that the examples given aren't handling stopping on blank lines or keeping input lines separated by "\n".

I had to use BufRead::fill_buf / BufRead::consume instead of BufRead::read_line so I could peek at the empty line without consuming it from the reader.

Here's the solution I came up with:


fn up_to_first_empty_line<T: BufRead>(reader: &mut T) -> Result<String, ReadLinesError> {
    let mut result = Vec::new();
    loop {
        let buf = reader.fill_buf()?;
        if buf.is_empty() {
            // No more data in the stream buffer (end of iteration).
            break;
        }
        let newline_index = buf.windows(2).position(|pair| pair == b"\r\n");
        let consume_amt;
        match newline_index {
            Some(index) => {
                let (line, _) = buf.split_at(index);
                if line.is_empty() {
                    // Just peek at the stream buffer. Don't consume the empty line.
                    break;
                } else {
                    consume_amt = Some(line.len() + 2);
                    result.extend_from_slice(line);
                    result.extend_from_slice(b"\n");
                }
            },
            None => break,
        };
        // Consume the appropriate number of bytes in the stream buffer, if any.
        if let Some(amt) = consume_amt {
            reader.consume(amt)
        }
    }
    Ok(String::from_utf8(result)?)
}

This solution assumes the input lines are separated by "\r\n" like the headers in an HTTP response.

I've been looking at this all week, and I don't believe there's a simpler way to accomplish this by chaining together iterator methods while sticking to the following requirements:

  1. Stop at the first empty line.
  2. Leave the first empty line in the stream buffer.
  3. Stop the iteration if an error occurs, propagating the error back to the caller.

I had to create my own error type, ReadLinesError, to use the '?' operator in my function. For completeness, here's my implementation of ReadLinesError.

#[derive(Debug)]
enum ReadLinesError {
    Io(io::Error),
    Utf8(string::FromUtf8Error),
}

impl error::Error for ReadLinesError {}

impl fmt::Display for ReadLinesError {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        match self {
            ReadLinesError::Io(e) => e.fmt(f),
            ReadLinesError::Utf8(e) => e.fmt(f),
        }
    }
}

impl From<io::Error> for ReadLinesError {
    fn from(e: io::Error) -> Self {
        ReadLinesError::Io(e)
    }
}

impl From<string::FromUtf8Error> for ReadLinesError {
    fn from(e: string::FromUtf8Error) -> Self {
        ReadLinesError::Utf8(e)
    }
}

Because I had to use a raw loop and methods on the BufRead trait to accomplish my task, I'm marking @quinedot's response as the solution. Thanks so much to everyone for their responses!

Here's about what I had in mind, though it may need tuning for the corner cases.

An alternative is to piggy-back on top of std::io::Error:

pub fn up_to_first_empty_line<T: BufRead>(reader: &mut T) -> Result<String, std::io::Error> {
    // ...

    let string = String::from_utf8(result).map_err(|e| {
        let kind = std::io::ErrorKind::InvalidData;
        std::io::Error::new(kind, e)
    })?;

    Ok(string)
}

I like the idea of using BufRead::read_line because it wraps both I/O related errors and UTF-8 errors in an io::Error. Your solution feels much cleaner and simpler than mine...mine felt very error-prone while I was piecing it together. The one thing your solution doesn't do is leave the empty line in the reader, but that's really not a sensible requirement the more I think about it.

Thanks for the suggestion regarding map_err. I hadn't thought about using it like this to avoid having to create a boilerplate custom error.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.