How to send 1M http(s) requests asyncronously and in batches?

I have 1 mln records in a DB for each of which I need to send an http request, wait for a result and fill in a corresponding cell in a DB with it.

I want to implement it asyncronously and in batches of, let's say, 100 requests. I'd use a) tokio b) thread-pool -- tokio too or rayon. Right?

And either a) wait for a 100 requests to finish before sending the next 100, or b) send a new one once any previously sent has returned a result.

How would I implement this in a simple manner?

For a batch I've chosen 100 randomly. How to determine the proper size of it: 100? Or 1000? Or perhaps 30?

update 1

It could also be not running in batches but running N workers which concurrently pull tasks off a queue. How would I do this?

I don't think there is a single way to accomplish this, but here's one way you could accomplish this. It uses the futures crate. And Tokio as the runtime. Reqwest is a good http client library. The stream trait is similar to what will be implemented in the rust standard library, though it will probably be called async iterator.

Generally you would use rayon for CPU bound work loads, not network/Io stuff. The method I'm outlining is demonstrated on this stack overflow example: rust - How can I perform parallel asynchronous HTTP GET requests with reqwest? - Stack Overflow

That's too generic, vague and uncomplete.

I'd already seen that link before asking my question.

Hmm... That answer seemed pretty long and fairly complete to me. Could you clarify what you need guidance on that isn't addressed by the accepted stack overflow answer?

5 Likes

Apologies if this is unwanted, but I ran this thread through ChatGPT and liked its answer. I've provided its take on the situation below. I'm also a bit interested on how more experienced users view its response. Let me know if you want it removed! Anyways, here goes...

[Reply #4]
User: ChatGPT
Replying To: Kura.Den
Content:

Hello Kura.Den,

I can provide a slightly more specific approach to handling your task, using a combination of Tokio and Reqwest which have the advantage of being async-friendly and providing a fairly straightforward interface for HTTP(S) requests.

Here's a general idea of how to structure your code to send HTTP requests in chunks, and update your DB records asynchronously. This isn't a complete program, but an outline:

use reqwest::Client;
use tokio::task;
use std::sync::Arc;

async fn process_batch(client: Arc<Client>, batch: Vec<Record>) {
    // Create a Vec of Futures that you can await later
    let tasks: Vec<_> = batch.into_iter().map(|record| {
        let client = Arc::clone(&client);
        task::spawn(async move {
            // Make your request using `client` and `record` here.
            // Then, update your DB record with the response.
            // Handle errors appropriately.
        })
    }).collect();

    // Wait for all requests in the batch to complete.
    let _results: Vec<_> = futures::future::join_all(tasks).await;
}

#[tokio::main]
async fn main() {
    // Client is wrapped in an Arc to allow sharing across tasks
    let client = Arc::new(Client::new());

    // Fetch your 1M records from the DB
    let records = fetch_records_from_db();

    // Process your records in batches
    let chunks = records.chunks(100);
    for batch in chunks {
        process_batch(Arc::clone(&client), batch.to_vec()).await;
    }
}

This assumes you have a Record type that represents a single record from your DB. Replace Record and fetch_records_from_db() with your actual data fetching code.

As for the batch size, it depends on a number of factors. The more requests you send at once, the more memory your program will use. But if you send too few requests at once, you won't fully utilize your network resources. You might want to experiment with different batch sizes to see what works best for your specific use case.

For your second question regarding N workers pulling off a queue, you may consider using a work-stealing model. Rust has a few work-stealing queue libraries such as rayon, async_nursery, async-task which can be used along with async channels (tokio::sync::mpsc). The concept is to put all your tasks (http requests in this case) into an async channel and let your workers pull from it and process them.

Remember to handle errors appropriately and perhaps add some backoff or rate limiting mechanism to prevent overloading the server with requests.

I hope this gives you a clearer direction. Let me know if there's anything you'd like me to expand upon.

2 Likes

If you want to keep it simple and use as few libraries as possible and generally keep things as simple and reliable as possible, I would go for this approach. Threads are built in. Here is the manual: Using Threads to Run Code Simultaneously - The Rust Programming Language Then you could do just as you described - make a queue. Lock access to that queue. Each thread can read from that queue and update the database directly.

What if this consumes all of the RAM?

I'd fetch records from the DB by pages, though, by wraping the code

    // Fetch your 1M records from the DB
    let records = fetch_records_from_db();

into a loop do ... while which fetches one page of 100...1000 records from the DB.

And this would be it? No other changes?

Just going to give you its response. : )

[Reply #7]
User: ChatGPT
Replying To: Kura.Den
Content:

Hello again Kura.Den,

Your concern about memory is a valid one, especially when working with such large datasets. In this case, it would indeed be better to fetch and process the records in pages.

Here's how you could modify the code to incorporate this idea:

use reqwest::Client;
use tokio::task;
use std::sync::Arc;

async fn process_batch(client: Arc<Client>, batch: Vec<Record>) {
    // The same code as before...
}

#[tokio::main]
async fn main() {
    let client = Arc::new(Client::new());
    
    let mut page = 0;
    let page_size = 1000;  // or whatever number you've found works best

    loop {
        let records = fetch_records_from_db_page(page, page_size);
        if records.is_empty() {
            break;
        }
        
        process_batch(Arc::clone(&client), records).await;
        page += 1;
    }
}

In this modified version, fetch_records_from_db_page(page, page_size) is a function that fetches one page of records from the DB, and you increment the page number after each batch. Replace this function with your actual paging code.

The loop ends when fetch_records_from_db_page returns an empty Vec, which should happen when there are no more records to fetch from the DB.

This way, you're only holding page_size number of records in memory at any given time, which should help with the memory usage. The rest of the code can remain the same.

Hope this helps and let me know if you have further questions!

1 Like

You will need Tokio or a similar async executor. You probably don't need multiple threads, and you certainly don't need Rayon, which is used for CPU-intensive and not IO-intensive tasks.

Your bottleneck is going to be the sending of HTTP requests. I don't know what site you are querying, but in general it's not good etiquette or may not even be physically possible to open hundreds of simultaneous connections to a remote machine.

Let's say you limit yourself to 10 simultaneous connections and the server responds quickly (avg. 30 ms per request). Then for 1 million requests, it will take 10^6 / 10 * 0.03 = 3000 seconds, or 50 minutes to download the data you need over HTTP.

In comparison, reading and writing 1 million records to even a slow database shouldn't take more than a few minutes. If it does, there's probably something wrong with your DB or the way you're accessing the DB. The work your Rust program does, sending and parsing requests, will only take a few seconds of CPU time at the most. In other words, your Rust program is going to spend almost all of its time waiting for data coming and going over the network, so it's perfectly possible to run all your Rust logic on a single-threaded Tokio runtime.

A fairly simple and robust solution would be something like:

  1. Use the Streaming feature of the SeaORM crate to read data in chunks from your DB. This will keep memory load low. If SeaORM doesn't support your DB, you can find some way to SELECT data in pages.
  2. Use reqwest's asynchronous streaming to buffer your requests and execute them concurrently. This blog post has a good explanation of how to do this for a large number of requests.
  3. As each HTTP response comes back, send an UPDATE to the database using an ORM or using raw SQL.
1 Like

Hi, ChatGPT responses are banned on the forums. Please do not post any ChatGPT responses here thankyou.

8 Likes

Hi Kura,

I had nearly the exact same question for an app I just wrote - you may be interested in this version using tokio and streams:

2 Likes