Stream CSV file from hyper body into Deserializer

Hi everyone,

I have been looking for a while on the internet and on this forum for a way to do this but have not found a straight answer.

I'm trying to deserialize a file downloaded via hyper without waiting for the end of the download to start. It is a big CSV (300MB) and it must be read sequentially so there must be a way right ? Or am I missing something ?

At the moment my code works but downloads the entire file before starting the deserialization.

    let elements = Vec::new();
    //Download big_file.csv from source
    let http = HttpConnector::new();
    let client = Arc::new(Client::builder()
        .build::<_, hyper::Body>(http));
    let url = "http://get-csv.com".to_string();
    let req = Request::get(url).body(Body::empty()).expect("Request builder");
    // Fetch the url...
    let res = client.request(req).await?;
    let body= body::to_bytes(res.into_body()).await?;

    //Read the body of the big_file.csv line by line and deserialize into struct
    let mut rdr = csv::Reader::from_reader(body.reader());
    for result in rdr.deserialize() {
        //type hint for automatic deserialization.
        let record: csv::Result<Element> = result;
        match record {
            Ok(element) => {
                elements.push(element);
            },
            Err(err) => {
                return Err(ResponseError::from(err))
            },
        }
    }

I'm using the csv crate which takes a Reader as parameter.

You cannot pass an asynchronous body directly to the csv crate, but you could read it line-by-line and parse each line separately. This can be done with something like the following:

use tokio::io::AsyncBufReadExt; // for read_line
use futures::stream::StreamExt; // for map_err

fn hyper_to_io(hyper: hyper::Error) -> std::io::Error {
    todo!()
}

// Fetch the url...
let res = client.request(req).await?;
let body = tokio_util::io::StreamReader::new(res.into_body().map_err(hyper_to_io));

let mut line = String::new();
loop {
    line.clear();
    let len = body.read_line(&mut line).await?;
    if len == 0 {
        // Reached end of file.
        break;
    }
    
    let mut rdr = csv::Reader::from_reader(line.as_bytes());

    for result in rdr.deserialize() {
        let record: csv::Result<Element> = result;
        match record {
            Ok(element) => {
                elements.push(element);
            },
            Err(err) => {
                return Err(ResponseError::from(err))
            },
        }
    }
}

If you put a Cursor<Vec<u8>> into the csv::Reader instead, you might even be able to reuse the csv reader by using read_until directly on the vector inside the reader through get_mut.

Edit: I removed the while loop that tries to read additional lines if available because I realized that wont work when using read_line, as partial lines can be lost when using it together with now_or_never.

1 Like

Hi,

Thanks for the quick reply! Could you show me what code you had in mind using the Cursor ? I would really like to try that one but I don't seem to get it right.

I was able to try the first solution you described and I realize it's actually not that easy while creating a new reader on every iteration skips the "read & remember the headers" part.

There is a way to manually set the headers on every iteration with rdr.set_headers(StringRecord::from(vec!["a", "b", "c"]));. In addition to this being less than ideal I stumble upon a csv-specific error err: DeserializeError { field: None, kind: UnexpectedEndOfRow } which is out of the scope of this topic.

Nevertheless, I'm curious to try the "Cursor" solution if that's a real possibility.

Thank you for your help.

I haven't tried it, but I think it would look like this:

use tokio::io::AsyncBufReadExt; // for read_line
use futures::stream::StreamExt; // for map_err
use std::io::Cursor;

fn hyper_to_io(hyper: hyper::Error) -> std::io::Error {
    todo!()
}

/// This function avoids keeping the entire csv file in memory by removing
/// the parts that have already been read from it.
fn shorten_vec(vec: &mut Cursor<Vec<u8>>) {
    let position = vec.position();
    // Remove everything in the vector up to position.
    vec.get_mut().drain(..position);
    // Set the position to zero.
    vec.set_position(0);
}

// Fetch the url...
let res = client.request(req).await?;
let body = tokio_util::io::StreamReader::new(res.into_body().map_err(hyper_to_io));

let mut rdr = csv::Reader::from_reader(Cursor::new(Vec::new()));
let mut iter = rdr.deserialize();

loop {
    let inner_vec = iter.reader_mut().get_mut();
    shorten_vec(inner_vec);
    let len = body.read_until(b'\n', inner_vec.get_mut()).await?;
    if len == 0 {
        // Reached end of file.
        break;
    }

    let record: csv::Result<Element> = iter.next().expect("EOF even though we got a line");
    match record {
        Ok(element) => {
            elements.push(element);
        },
        Err(err) => {
            return Err(ResponseError::from(err))
        },
    }
}

Thanks for your example alice. I've got it working by loading the first header row/line and the second in without advancing the iterator in between (note that I'm using tokio v0.2 - for compatibility reaons):

let mut body = tokio::io::stream_reader(res.into_body().map_err(|_err| { // note: in later tokio versions the StreamReader moved to tokio_util
    tokio::io::Error::new(tokio::io::ErrorKind::Other, "unimplemented")
}));

let mut reader = csv::Reader::from_reader(Cursor::new(Vec::new()));
let mut iter = reader.deserialize();
let mut first_line = true;
 
loop {
    let inner_vec = iter.reader_mut().get_mut();
    shorten_vec(inner_vec);
    let len = body.read_until(b'\n', inner_vec.get_mut()).await?; // note here I need another get_mut() to get the underlying vector instaed of the Cursor. Maybe this also changed in later versions?
    if len == 0 {
        // Reached end of file.
        break;  
    }

    if first_line {
        first_line = false;
        continue; 
    }

    let record: csv::Result<Element> = iter.next().expect("EOF even though we got a line");
    match record {
        Ok(element) => {
            elements.push(element);
        },
        Err(err) => {
            return Err(ResponseError::from(err))
        },
    }
}

Great to hear that you got it to work!

Nah, this was just a mistake in my original snippet. I have edited it to do it correctly.

Thank you both for you answers, I was able to compile the code but still, it doesn't work. It feels like the Cursor does not advance at all, when I print the position in the shorten_vec() function it always returns 0.

I have created a repo with this hoping someone could take a look at a concrete example.

https://github.com/arnaudpoullet/rust_csv_stream

It is because the csv file uses quotes to split some items across several lines. You're going to need to check whether the actual full row has been received.

Maybe there is an async-enabled crate already you can use?

Does this mean a synchronous body would make this possible? I still don't understand how what I'm trying to achieve is so difficult. Reading a http body to deserialize, like reading a file you deserialize is a sequential set of bytes, right ?

I've updated the with_stream package to use a csv file with no newlines and that does seem to work better.

Still I get a thread 'actix-rt:worker:0' panicked at 'Receiver::next_message called after None' error when executing it.

I'm hoping the https://github.com/haydnv/destream WIP crate will one day offer the possiblity of async csv deserialization but I don't really need async if I can find a solution that works without.

Yes, but the only synchronous body that would be allowed in async code would involve reading the entire file into memory before starting to parse it. This is because the alternatives are blocking.

Which line in your project is that?

It seems to be in the futures library /home/arnaud/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-channel-0.3.12/src/mpsc/mod.rs:1035:41

In my use case, while downloading and parsing the csv is one task at a time. And waiting for my csv to download is blocking anyway. Could you show me one such alternative ? If I spawn a thread to do this, does this not make it non-blocking ?

If you pass RUST_BACKTRACE=1 it will tell you where in your code it happened.

As for blocking, you can use reqwest::blocking, whose Response struct can be used directly as the reader for your csv. Be aware that this may not be used in async code, so you need a dedicated thread to use it.

Sorry about this, it seems to be in the read_until call :

15: tokio::io::util::read_until::read_until_internal
             at /home/arnaud/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.1.1/src/io/util/read_until.rs:55:36
16: <tokio::io::util::read_until::ReadUntil<R> as core::future::future::Future>::poll
             at /home/arnaud/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.1.1/src/io/util/read_until.rs:77:9
17: with_stream::download_and_parse_csv::{{closure}}
             at ./src/main.rs:78:19

Oh well that doesn't suit my needs while I'm doing async requests instead of actually adding the elements to a vector. It seems that what I'm trying to do has no easy solution. Still I do appreciate the help and hope this thread will help others on the subject.

I should have found the csv_async crate much earlier, this works well with the reqwest crate. This issue makes me think it's not possible with the hyper crate directly. I have added a "solution" package to my demo repo and hope this will help others. Thanks a lot Alice for your insights.

async fn download_and_parse_csv() -> Result<(), ResponseError> {
    let mut elements = Vec::new();

    let url =
        "https://raw.githubusercontent.com/arnaudpoullet/rust_csv_stream/master/static/myFile0.csv"
            .to_string();
    let res = reqwest::get(&url).await?;
    let body = res
        .bytes_stream()
        .map(|result| {
            result.map_err(|error| {
                std::io::Error::new(std::io::ErrorKind::Other, format!("{}", error))
            })
        })
        .into_async_read();
    //Read the body of the csv line by line and deserialize into Element if possible
    let mut rdr = csv_async::AsyncDeserializer::from_reader(body);
    let mut records = rdr.deserialize::<Element>();
    while let Some(record) = records.next().await {
        match record {
            Ok(element) => {
                //Process each element as it comes in
                elements.push(element);
            }
            Err(err) => return Err(ResponseError::from(err)),
        }
    }
    Ok(())
}
2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.