What is the most high performance way to handle more than 5 million Tokio tasks with a time limit, and then drop the ones that don’t finish within that duration?

I’ve got 2 ways to do what’s mentioned in the title, and I’ve been benchmarking them, but I’m not fully sure if my benchmarking approach is correct. Is there anyone here who could add more insights

So I have shared code like this

pub struct Pool {
    tx: flume::Sender<tokio::net::TcpStream>,
    rx: flume::Receiver<tokio::net::TcpStream>,
    i: std::sync::atomic::AtomicUsize,
    host: String,
    req: String,
    max: usize,
}

impl Pool {
    // async fn new ....

 pub async fn get(&self) -> Option<tokio::net::TcpStream> {
        if let Ok(conn) = self.rx.try_recv() {
            Some(conn)
        } else {
            let current = self.i.load(std::sync::atomic::Ordering::Relaxed);
            if current < self.max
                && let Ok(_) = self.i.compare_exchange(
                    current,
                    current + 1,
                    std::sync::atomic::Ordering::AcqRel,
                    std::sync::atomic::Ordering::Relaxed,
                )
            {
                Some(tokio::net::TcpStream::connect(&self.host).await.unwrap())
            } else {
                None
            }
        }
    }

    pub async fn put(&self, conn: tokio::net::TcpStream) {
        let _ = self.tx.send_async(conn).await.unwrap();
    }

Then I will reuse the connection to make http request

1st code is 1 tokio timeout + 5 milions tokio timeouts

tokio::time::timeout(tokio::time::Duration::from_secs(30),
    tokio::spawn(async move {
        loop {
            if http_context_ref
                .count
                .load(std::sync::atomic::Ordering::Relaxed)
                < max_concurrent
            {
                let http_context_ref = http_context_ref.clone();

                tokio::time::timeout(tokio::time::Duration::from_secs(30),
                tokio::spawn(async move {
                    http_context_ref
                        .count
                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);

                    let _permit = http_context_ref.semaphore.acquire().await.unwrap();

                    let request_start = tokio::time::Instant::now();
                    
                    let (cek, conn) = http_context_ref.client.send_get().await;

                    http_context_ref
                        .count
                        .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
                    http_context_ref.notify.notify_one();

                    if let Some(val) = conn {
                        let data = if cek {
                            crate::Data {
                                time: Some(request_start.elapsed()),
                                total_send: 1,
                            }
                        } else {
                            crate::Data {
                                time: None,
                                total_send: 1,
                            }
                        };

                        http_context_ref.client.put(val).await;
                        http_context_ref.s.send(data).unwrap();
                    }
                })
                )
                .await
                .unwrap()
                .unwrap();

            } else {
                http_context_ref.notify.notified().await;
            }
        }

    })
    )
    .await
    .unwrap()
    .unwrap();

2nd code is join set to save the handles, then dropping the join set after reached the deadline

tokio::spawn(async move {

    let mut join_set = tokio::task::JoinSet::new();
    let deadline = start + tokio::time::Duration::from_secs(http_config.max_duration as u64);


        while tokio::time::Instant::now() < deadline {
            if http_context_ref
                .count
                .load(std::sync::atomic::Ordering::Relaxed)
                < max_concurrent
            {
                let http_context_ref = http_context_ref.clone();

                tokio::spawn(async move {
                    http_context_ref
                        .count
                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);

                    let _permit = http_context_ref.semaphore.acquire().await.unwrap();

                    let request_start = tokio::time::Instant::now();
                    
                    let (cek, conn) = http_context_ref.client.send_get().await;

                    http_context_ref
                        .count
                        .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
                    http_context_ref.notify.notify_one();

                    if let Some(val) = conn {
                        let data = if cek {
                            crate::Data {
                                time: Some(request_start.elapsed()),
                                total_send: 1,
                            }
                        } else {
                            crate::Data {
                                time: None,
                                total_send: 1,
                            }
                        };

                        http_context_ref.client.put(val).await;
                        http_context_ref.s.send(data).unwrap();
                    }
                });
                
                if join_set.len() > 100 {
                    while let Some(_) = join_set.try_join_next() {
                        continue;
                    }
                    tokio::task::yield_now().await;
                }

            } else {
                http_context_ref.notify.notified().await;
            }
        }
        
        drop(join_set);

    }).await.unwrap();

My additional question for the second code: does saving 5 million handles consume a lot of RAM and potentially cause resource exhaustion? Which approach is better, draining the join set on every threesolve or not? If I drain it on every threesolve like in this part, will that block a single thread from picking tasks for a noticeable amount of time when the while condition is met?

if join_set.len() > 100 {
    while let Some(_) = join_set.try_join_next() {
        continue;
    }
    tokio::task::yield_now().await;
}

The issue I’m running into if I do not drop the tasks is that when task takes longer than the time limit, the program won’t finish until all tasks are done. I think it might be because the channel isn’t fully closed. When I drop all the tasks after the time limit ends, it doesn’t get stuck anymore. What I want is for the program to finish within the time limit, even if some tasks are still running. Which method is clearly able to spawn and finish the most tasks without exceeding the time limit?

Why don't you profile and see how much it uses? Note that profiling is diffrent than benchmarking.

Also, what on earth are you doing 5 miliion concurrent http requests for? I hope you are respecting robots.txt of the sites you are hitting, and aren't running one of those awful crawling bots that is overloading the modern web.

6 Likes

Thanks a lot for the profiling tutorial link! I haven’t really done profiling before because I never got it to work properly, the function names that showed up weren’t the ones I wrote, even though I had already enabled debug info = true. But I’ll give it another shot using the tutorial you shared since it looks different from what I tried before.

Also, last night I tried running the benchmark again with a longer duration and a heavier backend endpoint (a fib calculation), and this time the difference became clear. The version that uses JoinSet was able to spawn and complete way more requests compared to the version that uses Tokio’s timeout on each task, 340 vs 90 requests. Still, it’s lower than wrk when I compared (430 requests). But if I run join_set.join_all().await before dropping the join set, it actually reaches 520 requests since it waits for everything to finish. The thing is, that doesn’t match my spec, because requests that go past the deadline shouldn’t be counted, just like in wrk. So any requests still pending after the time limit should be dropped. And no, I’m not making a crawler, but a tool to benchmark HTTP servers like wrk and k6 but with extra features and trying to make it better performance than wrk. For example, it already includes a perf wrapper, and I just got the idea to add a profiler wrapper once I finally get profiling working

1 Like

I’d solve it like this:

  • don’t create millions of timers or job handles
  • create one CancellationToken and one task that sleeps until the deadline and calls cancel()
  • make a bounded work queue sized around the worker count
  • spawn a producer task that pushes all work items into the queue (stops on cancel)
  • create a bounded results queue
  • spawn workers = number of CPU threads * 2-4
  • each worker: recv → select! between doing work and cancelled, on cancel - exit, else send result and loop
  • read results in a loop until the timeout hits or queues close, finish
  • drop all senders so receivers terminate cleanly
2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.