Mut vec empty while pushing in async loop

I'm iterating over a tokio asyncbufread stream from a reqwest body, parsing each line, and storing the data in a vector. But it appears as if the work in the loop is only spawned since the function returns early with data being of len 0.

let mut reader = StreamReader::new(request.try_clone().unwrap()
  .send().await?
  .bytes_stream().map_err(|e| {
    std::io::Error::new(std::io::ErrorKind::Other, e)
  })).lines();

let mut data = Vec::new();
while let Some(line) = reader.next_line().await? {
    data.push(
        line.split(|p| p == '\t').map(|f| {
            f.parse::<f32>().unwrap_or_default()
        }).collect::<Vec<f32>>()
    );
}

What I'm looking for is to stream the request body through the parser and wait until the body has hit EOF and all lines are processed.

No, the vector cannot be the problem. For that data vec to remain empty, the loop must not execute at all, which means that .next_line returns Ok(None). So it would seem to me that the StreamReader was somehow empty. Perhaps you somehow check this?

You do mention observing an early “return” without showing us the code that makes sure data ends up becoming the return value in question; the issue can of course also lie there.

1 Like

If I put println!("{:?}", data); within the loop, it prints out the debug vector exactly as I would expect it. But as soon as it moves outside the loop, the data vector is empty.

The function simply returns Ok(data).

Just to get a second pair of eyes on the code that handles the return: could you share the code of the whole function with the printing and the returning, and perhaps also the code where it calls the function and observes the length to be 0 (presumably also with a print)?

3 Likes

Thanks for looking into the question @steffahn I solved the issue by returning the Vec in Result rather than returning the Tensor. Tensor was consuming the empty Vec in the async code and returning empty.

    async fn chunk_tensor(&self, request: &RequestBuilder, chunk_size: &u64, chunk_count: u64, device: &Device) -> Result<Tensor> {
        let mut reader = StreamReader::new(request.try_clone().unwrap()
            .body({
                format!(
                "{} LIMIT {} OFFSET {} FORMAT TabSeparatedRaw",
                self.query, chunk_size, chunk_count*chunk_size
            )})
            .send().await?
            .bytes_stream().map_err(|e| {
                std::io::Error::new(std::io::ErrorKind::Other, e)
            })).lines();

        let mut data = Vec::new();
        while let Some(line) = reader.next_line().await? {
            data.push(
                line.split(|p| p == '\t').map(|f| {
                    f.parse::<f32>().unwrap_or_default()
                }).collect::<Vec<f32>>()
            );
        }

        Ok(Tensor::new(data, device)?)
    }

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.