Program skipped printing logs when in multithreading with tokio

So I am working on a program that downloads contents from a specified mirror site. The download is okay, but the program does not log the SHA verification as the code specified.
When I specified the number of threads to 2 (4 tasks in total), the log printed for the first 2 tasks, but not for the rest 2.
When I specified the number of threads to 4 (4 tasks in total), the log printed nothing for the SHA verification.
It looks weired. My code is as following:

    /// a single thread for downloading files
    async fn download_single_file(
     	client: reqwest::Client,
     	url: String,
      	repository_local_path: String,
       	progress_bar: std::sync::Arc<indicatif::MultiProgress>,
        expected_sha256: String,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    
  		info!(
			"Downloading from URL: {}", &url
		);
		
		let response = client
			.get(&url).send().await?;
		let total_filesize = response
			.content_length().ok_or("Failed to get filesize")?;
		
		// setup a hasher for verifying sha256
		let mut hasher = sha2::Sha256::new();	
		let filename = url.split("/").last().unwrap();
		let template_string = "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta}) - "
    		.to_string();
		let output_string = template_string + filename;
		
		let pb = progress_bar.add(
			indicatif::ProgressBar::new(total_filesize)
		);
		pb.set_style(
			indicatif::ProgressStyle::default_bar()
           	.template(output_string.as_str())
               	.expect("Error when trying to render a progress bar")
           	.progress_chars("#>-")
		);
		
		if response.status().is_success() {
			
			// construct the eventual filepath,
			// make this file downloaded to the repository folder
			let filepath = std::path::Path::new(
				&repository_local_path
			).join(filename);
			
			let mut file = std::fs::File::create(filepath)?;
			// let content = response.bytes().await?;
			
			let mut downloaded: u64 = 0;
			let mut stream = response
				.bytes_stream();
			
			use futures_util::StreamExt;
			while let Some(item) = stream.next().await {
				let chunk = item?;
				
				// write file chunk
				file.write_all(&chunk)?;
				
				// store sha256 of the chunk to the hasher
				hasher.update(&chunk);
				
				downloaded += chunk.len() as u64;
				pb.set_position(downloaded);
			}
			
		} else {
			error!(
				"Failed to download file: {} - Status: {}", 
				&url, response.status()
			);
		}
		
		let result_hash = format!("{:x}", hasher.finalize());
		if result_hash == expected_sha256 {
			info!(
				"SHA256 hash verification succeeded for file {}", 
				filename
			);
		} else {
			warn!(
				"SHA256 hash mismatch for file: {}. Expected: {}, Got: {}", 
				filename, expected_sha256, result_hash
			);
		}
		
		info!("Downloaded and saved file: {}", filename);
		
		// file.write_all(&content)?;
		pb.finish_with_message(
			format!("Downloaded {}", filename)
		);
		
		return Ok(());
    }
    
    pub async fn download_files(
    	&self,
     	large_file_information: Vec<LargeFileInformation>
    ) -> Result<(), Box<dyn std::error::Error>> {
    	let client = reqwest::Client::new();
      	
     	info!(
      		"Downloading client has initiated. {} large file(s) to be downloaded.", 
        	large_file_information.len()
      	);
      
      	if large_file_information.len() == 0 {
      		warn!(
        		"No LFS urls had been detected. This could potentially be an error?"
        	);
       	}
    
        let progress_bar = std::sync::Arc::new(
        	indicatif::MultiProgress::new()
        );
        let mut handlers = Vec::new();
        let semaphore = std::sync::Arc::new(
        	tokio::sync::Semaphore::new(
         		self.tasks
         	)
        );
        
    	for single_large_file_information in large_file_information {
     		let client_in_thread = client.clone();
       		let repository_local_path_thread = self.repository_local_path
         		.clone().unwrap();
         	let progress_bar_thread = progress_bar.clone();
          	let semaphore_thread = semaphore.clone();
         
     		let handler = tokio::task::spawn(
          		async move {
            		let _permit = semaphore_thread
              			.acquire().await.unwrap();
              		
	            	return DownloadArguments::download_single_file(
	            		client_in_thread, 
	              		single_large_file_information.url,
	                	repository_local_path_thread,
	                 	progress_bar_thread,
	                  	single_large_file_information.sha256
	            	).await;
            	}
       		);
       		handlers.push(handler);
	    }
		
		let mut results = Vec::new();
		// for calculating the number of failed tasks
		let total_handlers = handlers.len();
		
		for handler in handlers {
			let result = handler
				.await?
				.expect("Error happened when downloading a file");
			
			results.push(result);
		}
		
		if results.len() == total_handlers {
			info!("All downloads had been succeeded!");
		} else {
			let failures = total_handlers - results.len();
			warn!("{} downloads had failed.", failures);
		}
		
		return Ok(());
    }

One thing I noticed is that you're using std file IO to write the downloaded files, rather than using the tokio APIs for file IO. The async runtime will be blocked while doing the file IO, which could cause problems if the downloads are large. Just a guess.

It might be an issue with indicatif overwriting the terminal. Have you trid removing all the terminal manipulation stuff and just having a log::error! call to show the checksum (to avoid a potential log level filtering) ?

If so, this can be solved by using MultiProgress::suspend() to hide the progress bar(s) while logging. I have an implementation of this with a custom logger that suspends a shared global MultiProgress in order to solve this problem for my application.

2 Likes

that might be the case. I will give it a try. thanks!

1 Like

I am using MultiProgress. It might be the case. I will give it a try.