when i print address in insert_single_useful_server,it seems to be in order, not like async.
is there something wrong? and How can I implement this using thread pools?
use crate::snark_proof_grpc::{
snark_task_service_client::SnarkTaskServiceClient, GetWorkerStatusRequest,
};
use crate::types::worker_models::ServerStatus::*;
use anyhow::Result;
use log::error;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::runtime::{Builder, Runtime};
use tokio::task::JoinHandle;
use tonic::Request;
const GET_SERVER_STATUS_TIME_OUT: Duration = Duration::from_secs(2);
const GET_SERVER_STATUS_THREAD_NUM: usize = 5;
async fn get_all_useful_server(
nodes: Vec<String>,
task_id: String,
) -> Result<Arc<Mutex<HashMap<String, String>>>, anyhow::Error> {
let server_map = Arc::new(Mutex::new(HashMap::new()));
let mut handles: Vec<JoinHandle<()>> = vec![];
for node in nodes {
let map = server_map.clone();
let id = task_id.clone();
let handle = tokio::spawn(async { insert_single_useful_server(map, id, node).await });
handles.push(handle);
}
for h in handles {
h.await;
}
Ok(server_map)
}
async fn insert_single_useful_server(
useful_map: Arc<Mutex<HashMap<String, String>>>,
task_id: String,
addr: String,
) {
let mut full_addr = "http://".to_string() + &addr;
println!("{}", full_addr); // for test
let uri = full_addr.parse::<tonic::transport::Uri>();
match uri {
Ok(u) => {
let u_s = u.to_string();
let channel = tonic::transport::Channel::builder(u)
.timeout(GET_SERVER_STATUS_TIME_OUT)
.connect()
.await;
match channel {
Ok(c) => {
let mut client = SnarkTaskServiceClient::new(c);
let req = Request::new(GetWorkerStatusRequest { task_id });
let res = client.can_do_snark_task(req).await;
match res {
Ok(response) => {
let status = response.get_ref().msg.clone();
if status == Free.to_string() {
/// if free,insert it into map
let mut s_m = useful_map.lock().unwrap();
s_m.insert(addr, response.get_ref().msg.clone());
drop(s_m)
}
}
Err(s) => {
error!("get server status failed with error: {}", s.message())
}
}
}
Err(e) => {
error!("connect {} failed with error: {}", full_addr, e)
}
}
}
Err(e) => {
error!("address with error: {}", e)
}
}
}
#[tokio::test]
async fn test_get_all_useful_server() {
use crate::types::post_server_conf::PostServerConf;
use fil_logger;
use uuid::Uuid;
fil_logger::init();
let cfg = PostServerConf::load_conf();
match cfg {
Some(c) => {
let config = c.verify().unwrap();
if let Some(nodes) = config.server_nodes {
let server_map_r = get_all_useful_server(nodes, Uuid::new_v4().to_string()).await;
match server_map_r {
Ok(server_map_arc) => {
let server_map = server_map_arc.lock().unwrap();
println!("{:?}", server_map)
}
Err(e) => {
println!("{}", e)
}
}
} else {
println!("nodes error")
}
}
None => {
println!("load config failed")
}
}
}