Runtime crash double free or corruption (out) Aborted

I try to create benchmarking tool, but I keep getting a crash with a message like this

double free or corruption (out) Aborted

Does anyone know why I get double free or corruption?

Here is all the error messages:

Here is my code:

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use tokio::sync::Semaphore;
use tokio::time::Duration;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

#[tokio::main]
async fn main() -> Result<()> {
    let url = "http://127.0.0.1:8080/4";
    
    let start = Instant::now();
    let success = Arc::new(AtomicUsize::new(0));
    let total = Arc::new(AtomicUsize::new(0));
    
    let semaphore = Arc::new(Semaphore::new(200));
    
    let client = reqwest::Client::builder()
        .pool_max_idle_per_host(200)  
        .pool_idle_timeout(Duration::from_secs(60))
        .timeout(Duration::from_secs(2))
        .build()?;
    
    let client = Arc::new(client);

    let mut spawn_count = 0;
    
    while start.elapsed().as_secs() < 15 {
        let client_clone = client.clone();
        let success_clone = success.clone();
        let total_clone = total.clone();
        let sem_clone = semaphore.clone();
        
        tokio::spawn(async move {
            let _permit = sem_clone.acquire().await.unwrap();
            
            match client_clone.get(url).send().await {
                Ok(response) if response.status().is_success() => {
                    success_clone.fetch_add(1, Ordering::Relaxed);
                }
                _ => {}
            }
            
            total_clone.fetch_add(1, Ordering::Relaxed);
        });
        
        spawn_count += 1;
    }

    let spawn_time = start.elapsed();

    while start.elapsed().as_secs() < 20 {
        let current_total = total.load(Ordering::Relaxed);
        if current_total >= spawn_count {
            break;
        }
        tokio::task::yield_now().await;
    }

    let final_success = success.load(Ordering::Relaxed);
    let final_total = total.load(Ordering::Relaxed);
    let final_time = start.elapsed();
    
    println!("Spawned: {} requests in {:.3}s", spawn_count, spawn_time.as_secs_f64());
    println!("Completed: {} requests", final_total);
    println!("Success: {}/{} ({:.2}%)", 
        final_success, final_total, 
        (final_success as f64 / final_total.max(1) as f64) * 100.0);
    println!("Total time: {:.3}s", final_time.as_secs_f64());
    println!("RPS: {:.0}", final_total as f64 / final_time.as_secs_f64());

    Ok(())
}

your code looks rather innocent tom me, but memory corruption usually is associated with unsound code.

can you post the full error message please?

The full error message is just like this

double free or corruption (out) Aborted

I added the screenshot link in the post

since there's no obvious issues in the code, I'm guessing it's probably a memory allocator problem.

can you provide more details about the environment your program is running in, e.g. target cpu architecture, os version, libc version, vm or containerization, memory limitation, things along those lines?

I use Android 15 (ARM64) with 8 cores, a total of 3.5 GB RAM, and about 850 MB free RAM, in Termux Proot Distro Arch Linux, the libc version is 2.41

At first I thought it was a resource exhaustion, but even after lowering concurrency to 50 and duration to 2 seconds, the crash still happened, where my backend was never crashing despite they are in the same machine.

After making some changes, the current code no longer crashes like the previous one. I am still figuring out what actually caused the crash. Was it really resource exhaustion? Does the batching really solve it?

The current is like this:

use std::sync::{Arc, atomic::{AtomicU64, Ordering}};
use std::time::Instant;
use tokio::sync::Semaphore;
use tokio::time::Duration;
use tokio::task::JoinSet;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let url = "http://127.0.0.1:8080/4";
    
    let start = Instant::now();
    let success = Arc::new(AtomicU64::new(0));
    let total = Arc::new(AtomicU64::new(0));
    
    let semaphore = Arc::new(Semaphore::new(200));
    let client = Arc::new(
        reqwest::Client::builder()
            .pool_max_idle_per_host(200)
            .pool_idle_timeout(Duration::from_secs(40))
            .timeout(Duration::from_secs(5))
            .build()?,
    );

    let mut join_set = JoinSet::new();
    let batch_size = 100;
    let max_concurrent = 200;
    let mut spawned_total = 0u64;

    while start.elapsed().as_secs() <= 30 {
        for _ in 0..batch_size {
            if start.elapsed().as_secs() >= 30 {
                break;
            }

            let client_clone = client.clone();
            let success_clone = success.clone();
            let total_clone = total.clone();
            let sem_clone = semaphore.clone();

            join_set.spawn(async move {
                let _permit = sem_clone.acquire().await.unwrap();

                match client_clone.get(url).send().await {
                    Ok(resp) if resp.status().is_success() => {
                        success_clone.fetch_add(1, Ordering::Relaxed);
                    }
                    _ => {}
                }
                total_clone.fetch_add(1, Ordering::Relaxed);
            });

            spawned_total += 1;
        }

        while join_set.len() > max_concurrent {
            if let Some(_) = join_set.try_join_next() {
                continue;
            }
            tokio::task::yield_now().await;
        }
    }

    while let Some(_) = join_set.join_next().await {}

    let duration = start.elapsed();
    let total_requests = total.load(Ordering::Relaxed);
    let success_requests = success.load(Ordering::Relaxed);
    let req_per_sec = total_requests as f64 / duration.as_secs_f64();
    
    println!(
        "Success: {}/{} in {:.2} seconds",
        success_requests,
        total_requests,
        duration.as_secs_f64()
    );
    
    println!("Requests per second: {:.2}", req_per_sec);
    
    if total_requests > 0 {
        println!(
            "Success rate: {:.2}%",
            (success_requests as f64 / total_requests as f64) * 100.0
        );
    } else {
        println!("Success rate: 0.00%");
    }

    println!("Total spawned: {}", spawned_total);

    Ok(())
}

I am also still trying to improve performance, tyen the features. Because the old version could spawn 700,000 connections in 15 seconds, while the current version only manages about 400,000 connections in 30 seconds longer duration (with same max concurrent 200). If you also have any solution to improve the performance, please telll me

the error might be a real double drop, or it might be the internal metadata maintained by malloc being corrupted, but either way, it's definitely a critical memory safety issue and an indication of possible UB somewhere in the program.

in theory, it could be caused by exhaustion, e.g. there's a chance on android, the kernel is configured differently from "regular" desktop/server linux systems, but I'm not familiar with Android, we need opinions of android experts on this.

your original code uses a semaphore to control the concurrent requests in-flight, but the semaphore didn't limit the number of tasks spawned into the scheduler. the code will keep spawning new tasks during the 2 second period, it's just most tasks were immediately suspended when they ran, due to the semaphore.

in other words, reducing the cocurrency limit alone does NOT reduce the memory consumption needed. reducing the time period does reduce the number of tasks being spawned, but I think 2 seconds is still a long period if the code keeps spawning new tasks repeatedly.

the modified code uses a join set to limit the total number of spawned tasks. if I read your code correctly, the maximum number of spawned tasks should be batch_size + max_concurrent, so it's still higher than max_concurrent, but NOT unbounded like before, so the memory consumption is also limited.

so it's possible memory exhaustion could indeed be the root cause of the allocator corruption.

I'm not sure what performance figure you are measuring, is it the concurrent tasks or concurrent requests/connections? or rate of spawning new tasks?

you are limiting the concurrent requests using the semaphore, so the number of concurrent requests stays very low, regardless how many tasks you spawn.

on the other hand, the max number of tasks is primarily determined by how many memory tokio's scheduler can allocate.

to measure the server performance, why not use existing tools? for example, with wrk:

# 8 threads, 200 concurrent connections, 30 second duration:
$ wrk -t8 -c200 -d30s http://127.0.0.1:8080/4

Out of an abundance of caution, you accidentally hit the "N" key instead of the "B" key, correct; if not, what does UN mean? I've seen US used for "unspecified", but I've not seen UN.

Can someone explain how final_success can be larger than final_total in the original code? I'm guessing it has something to do with atomics and the use of relaxed ordering, but it seems surprising that println!("Success:…) shows final_success as one higher than final_total.

Edit

Never mind. I didn't see that the JoinHandle returned from tokio::task::spawn wasn't awaited.

yeah, its'a typo, was meant to be UB.

good eye! I didn't read the numbers carefully to notice. but you are right, it's just a consequence of relaxed atomic operation.

the code didn't do any synchronization when printing the statistics number, at which point many tasks are still running concurrently by tokio's worker threads, because all the atomic operations are relaxed, it's completely normal to see "outdated" value by the "main" task, that's just the nature of relaxed memory order.

however, these measurement are, at best, just rough estimates, so I woundn't pay too much attention on them anyways.

Indeed, but my question actually came from misreading the code. I thought all the tasks were completed by the time the printlns were run, so I expected the numbers to be the same. Of course, why would the program abort due to memory corruption after main was pretty much done? It was a silly mistake by me. I apologize.

Thank youu so much for the detailed explanation

So it is still quite hard to pinpoint the exact root cause right now. The most likely cause is resource exhaustion. Anyway, I have just made a new version of the code, I decided to drop atomics completely since I need to store all the latency data for further statistical analysis (like median and mode). I am also working on categorizing latencies into ranges and counting the total indivodual of them (0–50 ms, 50–100 ms, 100–150 ms, 150–200 ms, and 200+ ms) to get how many latency that is between 150-200 ms etc, so it will give more detailed informations. On top of that, I plan to add the ability to test multiple endpoint url at the same time, so like I test "/url_1" and "/url_2" at the same time, it can give clear view of the performance drop effect of long cpu compute code inside async event loop thread (in tokio is tokio worker threads). I also plan to add perf wrapper function to also track other stats like CPU usage, cache misses, branch misses, and RAM usage that are easy to use.

The reason I do not use wrk is because it counts every request, regardless of whether it is 200 ok or a failed request. I noticed this a few months ago when testing with a simple scenario. I wanted to validate whether wrk was reliable, because I saw that my custom HTTP framework seemed to outperform Axum, Actix, etc. That made me wonder if wrk itself might be buggy. To check it, I created a simple Axum backend with a shared counter like this:

// pseudo code, not actual working code
fn count_handler() -> String {
    total_req += 1;
    format!("current total reqs: {}", total_req)
}

Then I ran wrk to this counter endpoint. Since the counter increments on every incoming request, I expected wrk’s reported request count to match the counter value. After wrk finished, I visited the counter endpoint manually in the browser to see the number. At that point, the counter should equal wrk’s result + 1 (because of my manual visit). But the numbers did not match, wrk's total number is much higher over 2x. That is when I realized wrk is not reliable, it gives the illusion of extremely high throughput even when the actual server counter shows very different number.

After that, I tried k6, and its results matched my backend’s counter perfectly. That gave me confidence that k6 is the correct tool. Then I checked and saw k6 is written in Go, which made me think, if I build my own tool in Rust, I should be able to spawn even more connections. And indeed, my tool can reach 390,000 connections compared to k6’s 310,000 under the same duration and max concurrent.

But I am still investigating latency, it shows higher max latency than k6 even though the tested backend is the same. The max latency can reach 200–300 ms, while k6 maxes out around 150 ms. I am not sure yet whether this is caused by the extra 80,000 connections or if it is because my code still has unoptimized code. Do you spot any possible causes for that in the code below?

Sorry for possible bad english grammar I use translate :sweat_smile:

use std::sync::{Arc};
use std::time::Instant;
use tokio::sync::Semaphore;
use tokio::time::Duration;
use tokio::task::JoinSet;
use crossbeam_channel::unbounded;
use tokio::runtime::Runtime;
use std::collections::HashMap;

struct Data {
    time: Option<Duration>,
    total_send: Option<u64>,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (s, r) = unbounded::<Data>();

    let start = Instant::now();

    let runtime = Runtime::new().unwrap();
    runtime.block_on(async {
        let url = "http://127.0.0.1:8080/4";

        let semaphore = Arc::new(Semaphore::new(200));
        let client = Arc::new(
            reqwest::Client::builder()
                .pool_max_idle_per_host(200)
                .pool_idle_timeout(Duration::from_secs(40))
                .timeout(Duration::from_secs(5))
                .build()
                .unwrap(),
        );

        let mut join_set = JoinSet::new();
        let batch_size = 100;
        let max_concurrent = 200;

        let s_ref = s.clone();

        while start.elapsed().as_secs() <= 30 {
            for _ in 0..batch_size {
                if start.elapsed().as_secs() >= 30 {
                    break;
                }

                let client_ref = client.clone();
                let sem_ref = semaphore.clone();
                let s_ref = s_ref.clone();

                join_set.spawn(async move {
                    let _permit = sem_ref.acquire().await.unwrap();

                    let request_start = Instant::now();

                    match client_ref.get(url).send().await {
                        Ok(resp) if resp.status().is_success() => {
                            let data = Data {
                                time: Some(request_start.elapsed()),
                                total_send: None,
                            };
                            s_ref.send(data).unwrap();
                        }
                        _ => {}
                    }
                    let data = Data {
                        time: None,
                        total_send: Some(1),
                    };
                    s_ref.send(data).unwrap();
                });
            }

            while join_set.len() > max_concurrent {
                if let Some(_) = join_set.try_join_next() {
                    continue;
                }
                tokio::task::yield_now().await;
            }
        }

        while let Some(_) = join_set.join_next().await {}
    });

    let duration = start.elapsed();

    drop(s);

    let mut times = vec![];
    let mut total_send = 0;

    for val in r.iter() {
        if let Some(val) = val.time {
            times.push(val.as_nanos() as u64);
        } else if let Some(val) = val.total_send {
            total_send += val;
        }
    }

    times.sort();
    let success = times.len();
    let total_times = times.iter().sum::<u64>();
    let req_per_sec = success as f64 / duration.as_secs_f64();

    let min_time = times.iter().min().unwrap();
    let max_time = times.iter().max().unwrap();

    println!(
        "Success: {}/{} in {:.2} seconds",
        success,
        total_send,
        duration.as_secs_f64()
    );

    println!("Requests per second: {:.2}", req_per_sec);

    if total_send > 0 {
        println!(
            "Success rate: {:.2}%",
            (success as f64 / total_send as f64) * 100.0
        );
    } else {
        println!("Success rate: 0.00%");
    }

    let min_ms = *min_time as f64 / 1_000_000.0;
    let max_ms = *max_time as f64 / 1_000_000.0;
    let avg_ms = (total_times as f64 / success as f64) / 1_000_000.0;

    let median_ms = if success % 2 == 0 {
        let mid = success / 2;
        (times[mid - 1] + times[mid]) as f64 / 2.0 / 1_000_000.0
    } else {
        times[success / 2] as f64 / 1_000_000.0
    };

    let mut freq: HashMap<u64, usize> = HashMap::new();
    for &t in &times {
        *freq.entry(t).or_insert(0) += 1;
    }
    let (mode_val, _) = freq.into_iter().max_by_key(|&(_, count)| count).unwrap();
    let mode_ms = mode_val as f64 / 1_000_000.0;

    let p90_idx = (0.90 * (success as f64 - 1.0)) as usize;
    let p99_idx = (0.99 * (success as f64 - 1.0)) as usize;
    let p90_ms = times[p90_idx] as f64 / 1_000_000.0;
    let p99_ms = times[p99_idx] as f64 / 1_000_000.0;

    println!("Min:    {:.2} ms", min_ms);
    println!("Max:    {:.2} ms", max_ms);
    println!("Avg:    {:.2} ms", avg_ms);
    println!("Median: {:.2} ms", median_ms);
    println!("Mode/Modus:   {:.2} ms", mode_ms);
    println!("p90:    {:.2} ms", p90_ms);
    println!("p99:    {:.2} ms", p99_ms);

    Ok(())
}

as I said in my previous reply, the code as written did NOT open more network connections than the concurrent limit, so I'm not quite sure what you meant when you say xxx amount of connections, do you actually mean the server throughput, as in request per second?

anyway, your code spawned many tokio tasks, but the majority of them were immediately blocking on the semaphore, and no new requests can be made until previous tasks finishes and releases a semphore count.

I'm not sure what is the reason for the longer latency you saw. the excessive tasks spawned can increase the workload of tokio's task scheduler, and also the overhead of the semaphore, which needs to maintain the list of all the tasks waiting on it, although I belive this overhead should not be too much for tokio's implementation, which is very scalable.

you can try an alternative, simpler architecture, where you only spawn max_concurrent number of tasks, let's call them "client" tasks, with each client sitting in a loop making requests. after spawning the clients, the "main" task can simply go to sleep for the configured duration, then notifies all the clients to stop (can be done with a simple atomic flag, or more sophisticated syncrhonization such as CancellationToken).

each client task maintains their local statistic numbers so no synchronization is required, which were returned when the client finished. the main task awaits all the client tasks' return value and merge the numbers to generate the final result.

Try running under valgrind. If you do it with debug symbols it should be able to accurately point out the problematic code (it probably is not in the code you pasted since you don't use any unsafe blocks).