tokio::BufReader.lines().next_line().await never resolves, execution paused forever

I am very new to rust (literally learnt it two weeks ago) and I've decided to make a faster node package manager (like npm) as my first project.

On the way, I have encountered many issues which I've resolved in the following manner-

  1. There is only one seemingly popular, well maintained & reputed crate for downloading files from the internet, that is reqwest. But it does not seem to be having a progress bar for download.

    So I implemented my own downloader with progress bar logic using tokio::Command::new("wget").args(["--show-progress", "-q"]) and tokio::BufReader() on its stderr to read the output of it and display the progress bar with indicatif

  2. I could not find any way of running multiple async functions (futures, in my case downloads) in parallel.

    So I am relying on ChatGPT code which goes like this-

       async fn main() {
         let stream = futures::stream::iter(0..1000).map(|i| { download().await });
         let buffer = stream.buffer_unordered(CONCURRENT_DOWNLOADS_COUNT);
         while Some(deps) in buffer.next().await {
           println!("{}", deps); 
           // deps because download function returns a list of dependencies of the package to further download, I have separate logic for managing that
         }
       }
    

    It creates 1000 awaited download functions ahead of time & stores them in an array, then pools them concurrently.

  3. But the download functions have to the same & for all.
    So I have added Arcs, Mutexes & broadcasts (mpmc from tokio) & its working with no errors.

But the problem is in the download function

/* ...wget process created here... */
let reader = BufReader::new(wget.stderr.unwrap());
let mut lines = reader.lines()
while Ok(Some(line)) in lines.next_line().await {
  println!("{}", line); // this never executes
  /* ...extracting percent complete from
    wget progress bar output here... */
}
println!("complete"); // runtime never comes here

As far as i have investigated lines.next_line().await pauses for ever, it never resolves.

This exact code works outside of stream.iter(0..1000).map(...), ex- in a separate file, but when put inside the #2 format, it does not work

I have been trying for days & days together but I have never been able to figure out what exactly is wrong. Please help me fix this issue. I can give my full code if needed. But, please I really need help.

let buffer = stream.buffer_unordered(CONCURRENT_DOWNLOADS_COUNT);

This is a good way to manage concurrent downloads.

It creates 1000 awaited download functions ahead of time & stores them in an array, then pools them concurrently.

No, it doesn't create them ahead of time. The stream map() is lazy just like iterator map().

So I have added Arcs, Mutexes & broadcasts (mpmc from tokio) & its working with no errors.

You probably don't need all or most of that, but it's hard to say without seeing more code.

As far as i have investigated lines.next_line().await pauses for ever, it never resolves.

wget's progress bar probably does not print any β€œlines” but uses \r to return the cursor to the beginning of the same line, so next_line() never sees a line ended with \n until the process finishes.

A better solution would be to avoid wget, continue using reqwest, and implement your own progress bar by counting the bytes as you receive them.

1 Like

counting the bytes as you recieve them

IT WORKED!!! here is my update download code

async fn download(url: &str) -> Result<(), Error> {
    let mut file = OpenOptions::new()
        .append(true)
        .create(true)
        .open("my_file.jpg")
        .expect("failed to open file");

    let response = reqwest::get(url).await?;
    let file_size: usize = response
        .headers()
        .get(header::CONTENT_LENGTH)
        .expect("content length not found in headers")
        .to_str()
        .unwrap()
        .parse()
        .unwrap();
    let mut stream = response.bytes_stream();

    let mut sum = 0;
    while let Some(Ok(item)) = stream.next().await {
        file.write(&item).unwrap();

        sum += item.len();
        println!("{}", sum * 100 / file_size);
    }

    Ok(())
}

Thanks a lot.

I wonder if there could be any more optimisations & if (though out of this topic) I can adapt any similar method for extracting the packages too (i.e. without using tokio::Command, presently i am using tokio::Commmand("tar")), one thing to note the packages are .tgz not .zip

Anyways thank you very much.

You want write_all. The write method may not write the entire thing.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.