Concurrent queue

Hi folks,
Disclaimer: Rust newbie.

I'm pulling about 6million records via an API with a hard limit of 1k records per response resulting in 6k sequential requests (doesn't allow for parallel requests).

My current implementation fetches 1k records and spawn task using tokio async to persists the records into a db using sqlx and repeat the cycle. Tokio because of the ability to abort threads should one thread fail (JoinSet). The returned records are split and tokio threads are started to insert them.

I'm thinking of moving the fetching of the records into a separate thread which will allow fetching to proceed while previous fetched records are being processed.

Any advice on the best approach and framework is welcomed.

Is there a concurrent queue in rust which will allow for the insertion and popping of objects by different threads, fetcher thread to insert and other threads to pop and process.

Thank you.

1 Like

I'm not even sure you are using "threads" correctly (tokio tasks don't correspond to OS or hardware threads!), but I don't think it's a particularly good idea to hammer the DB with insertions in parallel.

One thing is that with too many parallel requests, you'll unnecessarily saturate the concurrency control system of the DB (assuming you are using a client-server database). The bigger problem is that you will thus spawn many transactions, which is potentially much slower than a single transaction with several insertions could be.

I think a better approach would be to hand over the data to be inserted to a single dedicated thread, which would wait for a complete batch, and then insert them all at once in a single transaction. This would give you all-or-none failure handling behavior for free, too.

4 Likes

I'm actually spawning 5 task and not necessarily threads per my understanding as tokio threads/runtime will execute these lightweight task/threads with each task inserting 200 records batched together.

So in effect, only 5 task is spawned and awaited on. I measured the time elapsed and the total insertion time is a fraction of a second.

So, what problem are you trying to solve? "A fraction of a second" could mean a lot of things, but conservatively let's say it's 500 ms. With 6,000 batches, that works out to about 50 minutes. It sounds like a batch processing scenario, and maybe 50 minutes is adequate for that?

Apart from the actual goal, I also can't tell if you are describing a method of lowering latency by fetching from the API while the DB tasks are doing their thing. That sort of thing is done with conceptual pipelines, which you build from async primitives. It looks like there are also some crates for it, but they don't have much popularity (they are thin abstractions over future and stream chaining). Tokio already provides stream combinators like map for this.

Sorry for the lack of details WRT to my previous post, on the average it takes about 0.012193seconds /12193 microseconds to insert 200records (batch), so for a total of 1000 records per pull, that's roughly 0.060965seconds or 60965microseconds making it about 7mins to insert 6million records.

The goal was to be have a separate thread pull the data into a queue while the other thread(s) consume the data, looking at the total time for insertion, I don't think having a queue will make much difference since all the time is spent fetching the records.

That's why I brought up latency and pipelines. Even if most of the time is taken by fetching from the external API, you have two sources of latency to deal with:

  1. Latency from the API call.
  2. Latency from the DB insertions.

Without a "pipeline" (i.e., the naive approach), each task will first be blocked by the fetch and then also blocked by the DB insert. In other words, the operations are sequentially executed. You can't start the next API fetch until the current DB insert completes.

image

Even if you have N tasks, the total latency is still bounded by both calls. A "pipeline" splits up the API and DB operations so that they are logically independent of one another. In theory, this means that you can have a task that fetches the next batch from the API when it is ready, without waiting for any DB operations. The DB operations are inherently dependent upon results from the API.

Here, we have two tasks with separate roles. They share data with a channel (mpmc) and operate independently. The fetch task grabs the next batch from the API as soon as it is able (after potentially blocking on the channel). The DB task inserts the next available batch (after potentially blocking on the channel).

Now you can have N fetch tasks and M DB tasks. Scale them independently, etc. This design has an issue that the channel may become a bottleneck from contention. Given that the tasks take multiple milliseconds and there are only 5 of them, you probably won't have any contention issues [1].

But this does have at least one advantage: a large N (number of fetch tasks) can parallelize many API calls to amortize the API latency without incurring any DB latency, assuming that either, 1. the DB inserts are orders of magnitude faster, or 2. your channel is unbounded or at least large enough to hold all of the batches. Keep in mind that a large channel is not bullet-proof! It basically destroys the advantages of back-pressure and can easily lead to lower resilience. It's an in-memory queue that goes away when the process dies.

Now is also a good time to bring up Amdahl's law. In simple terms, it just means that you can only make a system as fast as its slowest part. Throwing out a random number again, say that the API fetch takes 100 ms. Even with everything else fully parallelized, the fastest that the whole process could theoretically run is in 100 ms + 12 ms, because you cannot reduce the latency of that API fetch by "parallelizing it further" and the DB inserts depend on the API fetches. (And the sequence diagram is a visualization of this property.)

This is a theoretical minimum, of course. You are more likely to be bounded on the number of concurrent API requests and concurrent DB inserts than anything else. And it leads directly back to your observation that having a queue may not make much of a difference at all. You might be able to save a minute or two out of the 7 total. But it's mostly going to be bound on how long it takes to read everything from the API.


  1. Maybe. Scheduling is hard! There are also work stealing schedulers which allows the use of multiple channels instead of just one. This is a different kind of M:N strategy. ↩ī¸Ž

3 Likes