Parsing Actix Client response line by line asynchronously

Hello there,
I'm currently working on a project that uses Actix to retrieve JSONs from a server, processes them and POST them to another server. It works just fine if the number of JSON objects is small. However, if the response body is, say, around 20,000 lines, I get an Overflow Error from Actix. Is there a way to read lines from the body and process them asynchronously? i.e. parse them and send them to the api while getting the response. Thanks in advance.

I'm using Actix Client Crate for sending and receiving.
For reference, here's my code snippet

    let client = Client::new();

    // Getting payload from the server
    let payload = client.request(Method::GET, uri)
                        .basic_auth(username, 
                                           password)
                        .query(params)
                        .unwrap()
                        .send()
                        .await
                        .unwrap()
                        .body()
                        .await
                        .unwrap();

    let lines = std::str::from_utf8(&payload).unwrap()
                                                .to_string();

    // a new client to post to the api
    let client = Client::new();
    for line in lines.lines() {
        // transform the source data to api format
        let parsed = serde_json::from_str(line).unwrap();

        let r = client.clone()
                        .post("my-api-uri")
                        .send_json(&parsed)
                        .await
                        .unwrap();

If you have an object that implements the Stream trait with an item type of Result<Bytes, SomeError> as it seems like ClientResponse does (I assume this is the right library?), then you can put it into stream_reader, which would allow you to call read_line on it.

use futures::future::TryStreamExt;
use tokio::io::{stream_reader, AsyncBufReadExt};

fn into_io_error(err: awc::error::PayloadError) -> std::io::Error {
    // some conversion here
}

let response = client.request(Method::GET, uri)
    .basic_auth(username, password)
    .query(params)
    .unwrap()
    .send()
    .await
    .unwrap();

let response = stream_reader(response.map_err(into_io_error));
let mut line = String::new();
while response.read_line(&mut line).await? != 0 {
    // use line now
    line.clear();
}

You probably have to convert the error manually, since the AsyncRead trait requires the error type to be an IO error, so I added a function for doing so.

1 Like

Hi, thanks for your detailed response.
.body() gives me the struct: awc::response::MessageBody<actix_http::encoding::decoder::Decoder<actix_http::payload::Payload<std::pin::Pin<std::boxed::Box<dyn futures_lite::Stream<Item = std::result::Result<bytes::bytes::Bytes, actix_http::error::PayloadError>>>>>>>
and it looks like Stream is wrapped around inside, is there a way to extract the Stream so I can use the Stream Reader on it?
Thanks.

ClientResponse already implements Stream so if you follow @alice‘s advice. It should work.

Yup, it worked. I didn't realize I don't need .body for it to work. Thanks!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.