Managing exact concurrency with futures/tasks (web scraper)

tl;dr
I need a way to specify how many concurrent futures/tasks should be running at any point (basically control how do I schedule them)

I'm a newbie in Rust so be patient with me. I have built a simple web scraper that takes a list of URLs and extraction config on input, downloads HTML for each URL and then extracts some data with the config.

The first version was done just with rayon and then didn't touch Rust for a year. Now I heard async is on stable so I added async version (you can choose if you want to go sync or async so most functions are doubled).

The way I do it now is super simple. I take the URLs, iterate on them and call the scraping function that returns future. Then I simply join them all.

    pub async fn run_async(self) { 
        let futures = self.request_list.sources.iter().map(|req| extract_data_from_url_async(parameters));
        futures::future::join_all(futures.into_iter()).await;
    } 

First, I heard about async-std so started with that. But I use reqwest which I was not able to make work together (if it is possible, I would like to hear). So I ended up using just #[tokio::main].

The issue I face now is if I provide 10k URLs, it will launch all the futures at once. Sure, I can handle 10k but probably not 1M, let alone I don't want to DDOS a site. Also since it allocates all the futures, it consumes quite a bit of memory and it is harder to integrate with other APIs due to rate limits.

So I need to manage this concurrency. I know I could probably solve this by creating my own executor but that's is very far from my skills. I'm searching for some library functionalities for this:

  1. At first level, just to specify (max) concurrency.
  2. Later I would like to handle the concurrency all myself dynamically (scale up and down as needed.)

I don't need to use the same libraries as I do. The reason I chose reqwest is that it is really simple and has a simple proxy interface (proxy is a must for me). So if you suggest me rewriting part of the code, I'm fine with that.

Here is the ugly codebase if you wish to take a look.

Please see the last part of this.

Can't you use a futures::channel::mpsc::channel() to send tasks to the scraper? When creating the channel you need to provide a buffer size, and it uses that to apply backpressure (i.e. so you don't launch 10,000 futures at once).

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.