Maxing out S3 -> EC2 bandwidth with Rusoto

Hey everyone,
I was trying to download some files from AWS S3 to an EC2 instance using Rusoto and figured it would be possible to almost max out the network bandwidth. Unfortunately I wasn't able to get close and I'm wondering if anyone has ideas on what I might be doing wrong. For a sanity check I did an implementation in Go that was able to download at 80% of the AWS advertised max bandwidth.

Summary

Here's the throughput I was able to get on EC2 instance which is supposed to get 25 Gigabits.
(Note that all the Rust exmaples are compiled in release mode and none of these are writing to disk, just reading the data into memory.)

  1. Go - 2.5 GB/s
  2. Rust-S3 blocking with Tokio 0.3 - 2.3 GB/s
  3. Rust-S3 blocking with Tokio 0.2 - 1.5 GB/s
  4. Rusoto - 1.0 GB/s

My main question is: Why is Rusoto so much slower than the first two cases? I don't need it to beat the Go implementation, but it would be nice to at least get close.

Details

In each case we're setting up some number of workers to download 8MB files from S3 and download 1000 files total. I tried different numbers of workers and each implementation seemed to reach it's maximum bandwidth somewhere in the range of 40-100 workers. All Rust code uses cargo 1.49.0-nightly (dd83ae55c 2020-10-20) in release mode. Everything was run on an inf1.6xlarge instance running Ubuntu 20.04.

Here are code snippets showing the main logic for each example (full code here)

Rusoto
async fn take_job(to_do: &Mutex<u32>) -> bool {
    let mut jobs_left = to_do.lock().await;
    if *jobs_left <= 0 {
        false
    } else {
        *jobs_left -= 1;
        true
    }
}

pub async fn download_test(
    num_workers: u64,
    samples: u32,
    region: Region,
    bucket: &str,
    key_prefix: &str,
) {
    let client = Arc::new(S3Client::new(region));

    let start = std::time::Instant::now();
    let to_do = Arc::new(Mutex::new(samples));
    let mut tasks = vec![];
    for i in 0..num_workers {
        let client_copy = Arc::clone(&client);
        let to_do_copy = Arc::clone(&to_do);
        let key = format!("{}-{}", key_prefix, i % 1000);
        let bucket = bucket.to_string();
        let task = tokio::task::spawn(async move {
            let mut worker_first = std::time::Duration::new(0, 0);
            let mut worker_last = std::time::Duration::new(0, 0);
            let mut worker_bytes = 0;
            loop {
                if take_job(to_do_copy.as_ref()).await {
                    let start = tokio::time::Instant::now();
                    let request = GetObjectRequest {
                        bucket: bucket.clone(),
                        key: key.to_string(),
                        ..Default::default()
                    };
                    let response = client_copy
                        .get_object(request)
                        .await
                        .expect("Error getting object");
                    worker_first += start.elapsed();
                    let body = response.body.unwrap();
                    let mut reader = body.into_async_read();
                    let mut buf = Vec::new();
                    reader
                        .read_to_end(&mut buf)
                        .await
                        .expect("Error writing to buffer");
                    worker_last += start.elapsed();
                    worker_bytes += buf.len()
                } else {
                    break;
                }
            }
            (worker_first, worker_last, worker_bytes)
        });
        tasks.push(task);
    }
    let mut total_first = std::time::Duration::new(0, 0);
    let mut total_last = std::time::Duration::new(0, 0);
    let mut total_bytes = 0;
    for task in tasks {
        let (first, last, bytes) = task.await.unwrap();
        total_first += first;
        total_last += last;
        total_bytes += bytes;
    }
    let time = start.elapsed();
    println!(
        "{:?}, {} MB, {} MB/s, {:?}, {:?}",
        time,
        total_bytes / (1024 * 1024),
        total_bytes as f64 / (time.as_secs_f64() * 1024.0 * 1024.0),
        total_first.checked_div(samples),
        total_last.checked_div(samples),
    );
}
Rust-S3
fn take_job(to_do: &Mutex<u32>) -> bool {
    let mut jobs_left = to_do.lock().unwrap();
    if *jobs_left <= 0 {
        false
    } else {
        *jobs_left -= 1;
        true
    }
}

fn do_downloads(num_workers: u32, samples: u32, bucket: &str, key_prefix: &str, region: &str) {
    let to_do = Arc::new(Mutex::new(samples));
    let mut jobs = vec![];
    let start = std::time::Instant::now();
    let bucket = Arc::new(
        Bucket::new(
            bucket,
            region.parse().unwrap(),
            Credentials::default().unwrap(),
        )
        .unwrap(),
    );
    for i in 0..num_workers {
        let to_do = Arc::clone(&to_do);
        let bucket = Arc::clone(&bucket);
        let key = format!("{}-{}", key_prefix, i % 1000);
        let job = std::thread::spawn(move || {
            let mut worker_bytes = 0;
            loop {
                if take_job(to_do.as_ref()) {
                    let (data, code) = bucket.get_object_blocking(&key).unwrap();
                    assert_eq!(code, 200);
                    worker_bytes += data.len();
                } else {
                    break;
                }
            }
            worker_bytes
        });
        jobs.push(job)
    }

    let total_bytes = jobs
        .into_iter()
        .fold(0, |accum, job| accum + job.join().unwrap());
    let time = start.elapsed();
    println!(
        "{:?}, {}, {} MB/s",
        time,
        total_bytes,
        total_bytes as f64 / (time.as_secs_f64() * 1024.0 * 1024.0)
    );
}
Go
type stats struct {
	bytes     int64
	firstByte time.Duration
	lastByte  time.Duration
}

func runTests(s3Client *s3.S3, workers int, samples int) {
	testTasks := make(chan int, workers)

	// a channel to receive results from the test tasks back on the main thread
	results := make(chan stats, samples)

	// create the workers for all the threads in this test
	for w := 1; w <= workers; w++ {
		go func(o int, tasks <-chan int, results chan<- stats) {
			for range tasks {
				// generate an S3 key from the sha hash of the hostname, thread index, and object size
				key := fmt.Sprintf("%v-%d", keyPrefix, o%1000)

				// start the timer to measure the first byte and last byte latencies
				latencyTimer := time.Now()

				// do the GetObject request
				req := s3.GetObjectInput(s3.GetObjectInput{
					Bucket: aws.String(bucket),
					Key:    aws.String(key),
				})

				resp, err := s3Client.GetObject(&req)

				// if a request fails, exit
				if err != nil {
					panic("Failed to get object: " + err.Error())
				}

				// measure the first byte stats
				firstByte := time.Now().Sub(latencyTimer)
				bytes := *resp.ContentLength

				// create a buffer to copy the S3 object body to
				var buf = make([]byte, bytes)

				// read the s3 object body into the buffer
				size := 0
				for {
					n, err := resp.Body.Read(buf)

					size += n

					if err == io.EOF {
						break
					}

					// if the streaming fails, exit
					if err != nil {
						panic("Error reading object body: " + err.Error())
					}
				}

				_ = resp.Body.Close()

				// measure the last byte stats
				lastByte := time.Now().Sub(latencyTimer)

				// add the stats result to the results channel
				results <- stats{bytes, firstByte, lastByte}
			}
		}(w, testTasks, results)
	}

	// start the timer for this benchmark
	benchmarkTimer := time.Now()

	// submit all the test tasks
	for j := 1; j <= samples; j++ {
		testTasks <- j
	}

	// close the channel
	close(testTasks)

	sumFirstByte := int64(0)
	sumLastByte := int64(0)
	totalBytes := int64(0)
	// wait for all the results to come and collect the individual datapoints
	for s := 1; s <= samples; s++ {
		stats := <-results
		sumFirstByte += stats.firstByte.Nanoseconds()
		sumLastByte += stats.lastByte.Nanoseconds()
		totalBytes += stats.bytes
	}

	// stop the timer for this benchmark
	totalTime := time.Now().Sub(benchmarkTimer)

	// calculate the throughput rate
	rate := (float64(totalBytes)) / (totalTime.Seconds()) / 1024 / 1024

	// print the results to stdout
	fmt.Printf(
		"%9.4f s, %v B, %6.1f MB/s, %5.0f ms, %5.0f ms\n",
		totalTime.Seconds(),
		totalBytes,
		rate,
		float64(sumFirstByte)/float64(samples)/1000000,
		float64(sumLastByte)/float64(samples)/1000000)
}

Some Differences in Syscalls

Rusoto is using Tokio 0.2 and the difference between the two blocking Rust-S3 versions suggest that part of the difference could be due to changes in the Tokio I/O driver. When running strace on the three Rust versions I see that Rusoto and Rust-S3 with Tokio 0.2 make a lot of calls to sched_yield. Rust-S3 with Tokio 0.3 instead seems to just end up using epoll_wait while reading data (and has an order of magnitude fewer syscalls than the Tokio 0.2 version). Go, on the other hand, uses a mix of epoll_pwait and nanosleep.

Side Question on Tokio Version

The blocking version of Rust-S3 just calls the async version in a Runtime::block_on. Since Rust-S3 depends on Tokio 0.2 I'd expect it to use the 0.2 runtime no matter what. But the behavior does seem to change when my application depends on Tokio 0.3. How does that work?

Also it seems like the async version or Rust-S3 doesn't work with Tokio 0.3, so why does the blocking version work with Tokio 0.3?