Tokio/Reqwest Byte Stream to Lines

Hi

I'm receiving streamed chunks of newline delimited JSON data from a website. Occasionally, the data will span multiple chunks, so a chunk is not necessarily a whole JSON object. So, rather than parse each chunk, I need to parse each line as it arrives (possibly over multiple chunks).

I would like to await on each line. What I'm missing, is how to convert the response byte stream to lines.

The code below results in multiple trait bounds not satisfied errors.

Any suggestions/help would be much appreciated.

use std::error::Error;

use tokio::io::AsyncBufReadExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {

    let client = reqwest::Client::new();

    let response = client.get("STREAMING_URL")
        .send()
        .await?
        .bytes_stream();

    // This doesn't work!
    let mut lines = response.lines();

    while let Some(line) = lines.next_line().await? {
        println!("length = {}", line.len())
    }

    Ok(())
}

So your problem is that you have something that implements Stream<Item = Result<Bytes>>, but you need something that implements AsyncBufRead. Luckily, such a conversion can be found in the tokio-util crate as tokio_util::io::StreamReader. Calling .lines() on it will work once you wrap it in a StreamReader.

You may run into some errors regarding the error type. In that case, you can do this:

use futures::stream::TryStreamExt; // for map_err

fn convert_err(err: reqwest::Error) -> std::io::Error { todo!() }

let reader = StreamReader::new(response.map_err(convert_err));
1 Like

Thank you Alice

This is the pointer I needed.

From the tokio_util::io documentation...

"These types are often used in combination with hyper or reqwest, as they allow converting between a hyper Body and AsyncRead."

This is exactly what I'm trying to do!

Thank you.

It is also exactly the purpose I originally wrote StreamReader for!

Hi Alice

Thanks for your suggestion to use StreamReader. I'm a Rust newbie, and although I know I need to convert the Request::Error to a std::io::Error using map_err, I'm still puzzled how to do so.

I've been trawling the internet for examples of how to use map_err but I'm still puzzled.

Any hints/suggestions would be very welcome.

The snippet I posted before should work:

use futures::stream::TryStreamExt; // for map_err

fn convert_err(err: reqwest::Error) -> std::io::Error { todo!() }

let reader = StreamReader::new(response.map_err(convert_err));

Apologies and doh! With my newbie head on I read todo!() as "implement this here". My code now works.

Thank you!

1 Like

Hi Alice.

Having read up on todo()!, I feel even more stupid. Thanks for your help. I can imagine it’s extremely frustrating answering such dumb questions.

Kind regards

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.