Why this cpu usage always low for tokio, whenever how much threads is set

I use actix_web create a web application in windows platform. bellow is my code. it read 20000 files in difference task, then sort and calculate, the cpu usage always keep to about 30%-40%, whenever I use difference threads number. So its performance obviousely slow than my golang program. who can help me?

#[actix_web::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        App::new()
            .service(
                web::resource("/performance-testing/calculate")
                    .app_data(web::JsonConfig::default().limit(1024)) // <- limit size of the payload (resource level)
                    .route(web::get().to(calculate_data)),
            )
    })
    .bind(("127.0.0.1", 8089))?
    .run()
    .await
}

async fn calculate_from_file(filename:&str) -> f64  {
    let file = File::open(filename).unwrap();
    let reader = BufReader::new(file);
    let mut nums: Vec<f64> = Vec::with_capacity(10000);
    for line in reader.lines() {
        let line = line.unwrap();
        let fields: Vec<f64> = line.split('|').filter(|s|s.len()>0).map(|s|fast_float::parse_partial::<f64, _>(s).unwrap().0).collect();
        nums.extend(fields);
    }
    nums.sort_by(|a, b| a.partial_cmp(b).unwrap());

    let mut products: Vec<f64> = Vec::with_capacity(1000);
    for chunk in nums.chunks_exact(10) {
        products.push(chunk.iter().product());
    }

    return products.iter().sum();
}

async fn calculate_data(
    parameters: web::Query<HashMap<String, String>>,
)-> HttpResponse {
    let start_time = Instant::now();
    let count: usize = parameters
        .0
        .get("count")
        .unwrap_or(&"".to_string())
        .parse()
        .map_or_else(
            |_| 100 as usize,
            |v| {
                if v == 0 as usize {
                    usize::MAX
                } else {
                    v
                }
            },
        );

    let def_folder = "c:\temp".to_string();
    let folder: &String = parameters
        .0
        .get("folder")
        .unwrap_or(&def_folder);


    let mut i = 0;
    //let mut set: JoinSet<_> = JoinSet::new();
    let mut tasks = Vec::with_capacity(1000);
    while i < count {
        let file_name = format!("{}/data_{}.txt",folder,i);
        tasks.push(tokio::spawn(async move {
            let res = calculate_from_file(&file_name).await;
            res
        }));
        i += 1;
    }

    let mut results = Vec::with_capacity(tasks.len());
    for task in tasks {
        results.push(task.await.unwrap());
    }
    let total:f64 = results.iter().sum();
    let mut result = HashMap::new();
    result.insert("result".to_string(), total);
    result.insert("cost".to_string(), start_time.elapsed().as_millis() as f64);
    return HttpResponse::Ok().json(result);
}

Compare with my bellow golang testing, they all read same 20000 files, golang run to 80%-90% CPU, and use 13s, but this Rust use about 30s.

func CalculateData(w http.ResponseWriter, r *http.Request) {
	start := time.Now()
	str_count := r.URL.Query().Get("count")
	if len(str_count) == 0 {
		str_count = "100"
	}
	count64, _ := strconv.ParseInt(str_count, 10, 64)
	count := int(count64)
	var mylog LogWrapper
	iLog := mylog.Init(LOGFILE)
	folder := r.URL.Query().Get("folder")
	if len(str_count) == 0 {
		folder = "c:/temp"
	}
	var wg_total sync.WaitGroup
	wg_total.Add(count)
	totals := make([]float64, count)
	for i := 0; i < count; i++ {
		index := i
		go func() {
			file, err := os.Open(folder + "/data_" + strconv.Itoa(index) + ".txt")
			if err != nil {
				iLog.Fatal(err)
			}
			defer file.Close()
			defer wg_total.Done()

			var nums []float64
			var wg sync.WaitGroup

			scanner := bufio.NewScanner(file)
			for scanner.Scan() {
				line := scanner.Text()
				fields := strings.Split(line, "|")
				for _, field := range fields {
					if len(field) != 0 {
						num, err := strconv.ParseFloat(field, 64)
						if err != nil {
							iLog.Fatal(err)
						}
						nums = append(nums, num)
					}
				}
			}

			sort.Float64s(nums)

			n := len(nums) / 10
			products := make([]float64, n)
			for i := 0; i < n; i++ {
				wg.Add(1)
				go func(i int) {
					defer wg.Done()
					start := i * 10
					end := (i + 1) * 10
					product := 1.0
					for j := start; j < end; j++ {
						product *= nums[j]
					}
					products[i] = product
				}(i)
			}

			wg.Wait()

			sum := 0.0
			for _, product := range products {
				sum += product
			}
			totals[index] = sum
		}()
	}
	wg_total.Wait()
	sum := 0.0
	for _, product := range totals {
		sum += product
	}
	var out calculate_out
	out.Result = sum
	out.Cost = int(time.Since(start).Milliseconds())
	resp, _ := json.Marshal(out)
	fmt.Fprintln(w, string(resp))
}
func main() {
	http.HandleFunc("/performance-testing/calculate", CalculateData)
	http.ListenAndServe(":8000", nil)
}

Reading 20k files cause your threads to be blocked doing file system IO for most of their time. Tokio does not like its worker threads doing blocking calls. There is an alternative to the std::fs module in tokio: tokio::fs. You could try using tokio::fs::File to read the contents of your file in a manner better suited for the tokio runtime. See this example from the tokio::fs::File documentation for how to use it.

1 Like

Note that, unlike Go, Rust will not insert automatic suspend points into your code - if you want to make use of the async runtime you have to use async libraries and .await them, which you are not doing here (there are no .awaits within your calculate function, so it's not benefitting from being async)

3 Likes

@geeklint very thank your reply, the calculate_from_file function is an async function

let res = calculate_from_file(&file_name).await;

it has used await.

@jofas follow your suggestion, I adjust the code, but its the performance still is low.

use tokio::fs::File;
use tokio::io::{AsyncReadExt, BufReader, AsyncBufReadExt}; 
use tokio::io::AsyncWriteExt;

async fn calculate_from_file(filename:&str) -> f64  {
    let file = File::open(filename).await;
    let reader = BufReader::new(file.unwrap());
    let mut nums: Vec<f64> = Vec::with_capacity(10000);
    let mut lines = reader.lines();
    while let Some(line) = lines.next_line().await.unwrap() {
        let fields: Vec<f64> = line.split('|').filter(|s|s.len()>0).map(|s|s.parse::<f64>().unwrap()).collect();
        nums.extend(fields);
    }
    nums.sort_by(|a, b| a.partial_cmp(b).unwrap());

    let mut products: Vec<f64> = Vec::with_capacity(1000);
    for chunk in nums.chunks_exact(10) {
        products.push(chunk.iter().product());
    }

    return products.iter().sum();
}

How large is your file? I suggest you try read ~1000 file per task instead of one, then aggregate.

Async fs performance is unfortunately pretty bad, on Linux there hasn't been good support historically, and on windows the completion based io doesn't fit with the readiness based model of the Future type: but you should still be able to get high occupancy, it just means it's using extra threads and copies. Go is either doing some dark magic or it should be pretty comparable in performance if you're using goroutines for this as the limitation is at the os level, but both are possible.

There's some early work to get better performance using io_uring out there like Glommio — async Rust library // Lib.rs or GitHub - bytedance/monoio: Rust async runtime based on io-uring.


Instead I would suggest using sync fs as before and rayon, possibly using the ThreadPoolBuilder to increase the thread count depending on the amount of time you're reading from disk

2 Likes

every file is 80K,it has 10 doubles in one line and 100 lines.

I also tested read this 20000 files and caculate them by c++ and java.
The c++ use 12s, go use 13s, jave is best, it only get 7s, I aslo do not know why java can run to so fast.
but they are all run to 100% cpu usage.
Only Rust only 30%-40% CPU usage. and use 30s. :joy:
very thank your suggestion, I will do more researching.

Well for the c++ case, you definitely aren't using async. Try use sync mode exclusively in rust and measure the time. (i.e. forget tokio::spawn, just use tokio::task::spawn_blocking once and do all your work here in sync code.).

Sanity check here, nobody seems to have mentioned it yet: are you compiling your Rust program in release mode? cargo build --release

@zirconium-n try use tokio::task::spawn_blocking and CPU run to 100% and finish 20000 files 6s, it is very fast.
:+1:
but I do not know why you said it is not used async. so is it mean we cannot use tokio::spawn for IO heavy function? or is it cause by something is wrong in my async function?

No, async is great for io in general - but in practice local file async io is badly supported as I've mentioned above, so pretty much everyone when they need an "async fs" just sends the request to a background thread using sync io. This ends up resulting in a lot of overhead, even with few concurrent operations, but with thousands you will end up with them mostly spending their time contending with each other.

The rule of thumb is use async when you're dealing with network, or other operations that "happen somewhere else" on the order of at least milliseconds. There's generally little reason to use the async fs operations, but they're occasionally very useful (network attached storage for example) and don't hurt most of the time.

If you were to add a Semaphore in tokio::sync - Rust to limit the number of simultaneous files being processed you would probably have seen better utilization and runtimes with the right value (normally around twice the number of CPUs), even with async though not as much as with sync.

1 Like

:+1: great explain. let me try it.

I adjust the code like bellow, Semaphore is set 24, it is the twrice of CPU, but it have not any effection

    let mut i = 0;
    //let mut set: JoinSet<_> = JoinSet::new();
    let mut tasks = Vec::new();
    let semaphore = Arc::new(Semaphore::new(24));
    while i < count {
        let file_name = format!("{}/data_{}.txt",folder,i);
        let permit = semaphore.clone().acquire_many_owned(1).await.unwrap();
        tasks.push( tokio::spawn(async move {
            let res = calculate_from_file(&file_name).await;
            drop(permit);
            res
        }));
       // );
        i += 1;
    }```

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.