I'm receiving streamed chunks of newline delimited JSON data from a website. Occasionally, the data will span multiple chunks, so a chunk is not necessarily a whole JSON object. So, rather than parse each chunk, I need to parse each line as it arrives (possibly over multiple chunks).
I would like to await on each line. What I'm missing, is how to convert the response byte stream to lines.
The code below results in multiple trait bounds not satisfied errors.
Any suggestions/help would be much appreciated.
use std::error::Error;
use tokio::io::AsyncBufReadExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let client = reqwest::Client::new();
let response = client.get("STREAMING_URL")
.send()
.await?
.bytes_stream();
// This doesn't work!
let mut lines = response.lines();
while let Some(line) = lines.next_line().await? {
println!("length = {}", line.len())
}
Ok(())
}
So your problem is that you have something that implements Stream<Item = Result<Bytes>>, but you need something that implements AsyncBufRead. Luckily, such a conversion can be found in the tokio-util crate as tokio_util::io::StreamReader. Calling .lines() on it will work once you wrap it in a StreamReader.
You may run into some errors regarding the error type. In that case, you can do this:
use futures::stream::TryStreamExt; // for map_err
fn convert_err(err: reqwest::Error) -> std::io::Error { todo!() }
let reader = StreamReader::new(response.map_err(convert_err));
Thanks for your suggestion to use StreamReader. I'm a Rust newbie, and although I know I need to convert the Request::Error to a std::io::Error using map_err, I'm still puzzled how to do so.
I've been trawling the internet for examples of how to use map_err but I'm still puzzled.