Fetching and Parsing in Parallel with Rust and Tokio

Hello,

I am building a client that requests blocks from the Solana blockchain and parses them. I am using the jsonrpsee_http_client crate to build the client and Tokio for parallel processing. However, I am facing performance issues. When fetching 20 blocks, it takes around 10 seconds to parse, and for 40 blocks, it takes around 20 seconds. I must keep up with this rate since a new block is created every 0.4 seconds in Solana.

I suspect there might be a bottleneck somewhere in my code. Can anyone help me identify where the performance issue might be and suggest a better approach to improve the block fetching rate?

Here is the code I am using:


impl NodeClient {
    async fn process_blocks(&self, block_number: u64) -> Result<(), ProcessBlockError> {
        let block_numbers = self.node_client.get_next_blocks_numbers(block_number).await?;
        let mut tasks: Vec<JoinHandle<_>> = Vec::new();

        for block_number in block_numbers.iter().cloned() {
            let node_client = self.node_client.clone();
            let min_block_failed = Arc::clone(&min_block_failed);
            let task = tokio::spawn(async move {
                log::info!("Start handling block {}", block_number);
                let result = node_client
                    .get_block_request(block_number, true)
                    .await
                    .map_err(ProcessBlockError::Node);
                log::info!("Finish handling block {}", block_number);
                if let Err(error) = &result {
                    log::info!(
                        "Failed to get block number {} with error: {:#?}",
                        block_number,
                        error
                    );
                    min_block_failed
                        .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
                            if block_number < current {
                                Some(block_number)
                            } else {
                                None
                            }
                        })
                        .unwrap_or_else(|current| current);
                }
                (block_number, result)
            });
            tasks.push(task);
        }
        
        let current_time = Local::now();

        // Await all tasks to complete
        let results = join_all(tasks).await;

        let elapsed_time = Local::now().signed_duration_since(current_time);
        log::info!(
            "Elapsed time: {} seconds, number of tasks {}",
            elapsed_time.num_seconds(),
            results.len(),
        );

        Ok(())
    }

    async fn get_block_request(
        &self,
        block_id: u64,
        with_transactions: bool,
    ) -> Result<UiConfirmedBlock, ProcessBlockError> {
        let transaction_details = if with_transactions { "full" } else { "none" };

        let request_params = serde_json::json!({
            "encoding": "jsonParsed",
            "maxSupportedTransactionVersion": 0,
            "transactionDetails": transaction_details,
            "rewards": false
        });
        let params = rpc_params![block_id, request_params];
        let block: core::result::Result<UiConfirmedBlock, jsonrpsee_core::Error> =
            self.client.request("getBlock", params).await;
        block.map_err(|err| self.handle_error(err, "Failed to get block"))
    }
}