About tokio::spawn,it doesn't looks like async, is there something wrong with my code?

when i print address in insert_single_useful_server,it seems to be in order, not like async.
is there something wrong? and How can I implement this using thread pools?

use crate::snark_proof_grpc::{
    snark_task_service_client::SnarkTaskServiceClient, GetWorkerStatusRequest,
};
use crate::types::worker_models::ServerStatus::*;
use anyhow::Result;
use log::error;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::runtime::{Builder, Runtime};
use tokio::task::JoinHandle;
use tonic::Request;

const GET_SERVER_STATUS_TIME_OUT: Duration = Duration::from_secs(2);
const GET_SERVER_STATUS_THREAD_NUM: usize = 5;

async fn get_all_useful_server(
    nodes: Vec<String>,
    task_id: String,
) -> Result<Arc<Mutex<HashMap<String, String>>>, anyhow::Error> {
    let server_map = Arc::new(Mutex::new(HashMap::new()));
    let mut handles: Vec<JoinHandle<()>> = vec![];
    for node in nodes {
        let map = server_map.clone();
        let id = task_id.clone();
        let handle = tokio::spawn(async { insert_single_useful_server(map, id, node).await });
        handles.push(handle);
    }
    for h in handles {
        h.await;
    }
    Ok(server_map)
}

async fn insert_single_useful_server(
    useful_map: Arc<Mutex<HashMap<String, String>>>,
    task_id: String,
    addr: String,
) {
    let mut full_addr = "http://".to_string() + &addr;
    println!("{}", full_addr); // for test
    let uri = full_addr.parse::<tonic::transport::Uri>();
    match uri {
        Ok(u) => {
            let u_s = u.to_string();
            let channel = tonic::transport::Channel::builder(u)
                .timeout(GET_SERVER_STATUS_TIME_OUT)
                .connect()
                .await;
            match channel {
                Ok(c) => {
                    let mut client = SnarkTaskServiceClient::new(c);
                    let req = Request::new(GetWorkerStatusRequest { task_id });
                    let res = client.can_do_snark_task(req).await;
                    match res {
                        Ok(response) => {
                            let status = response.get_ref().msg.clone();
                            if status == Free.to_string() {
                                /// if free,insert it into map
                                let mut s_m = useful_map.lock().unwrap();
                                s_m.insert(addr, response.get_ref().msg.clone());
                                drop(s_m)
                            }
                        }
                        Err(s) => {
                            error!("get server status failed with error: {}", s.message())
                        }
                    }
                }
                Err(e) => {
                    error!("connect {} failed with error: {}", full_addr, e)
                }
            }
        }
        Err(e) => {
            error!("address with error: {}", e)
        }
    }
}

#[tokio::test]
async fn test_get_all_useful_server() {
    use crate::types::post_server_conf::PostServerConf;
    use fil_logger;
    use uuid::Uuid;
    fil_logger::init();
    let cfg = PostServerConf::load_conf();
    match cfg {
        Some(c) => {
            let config = c.verify().unwrap();
            if let Some(nodes) = config.server_nodes {
                let server_map_r = get_all_useful_server(nodes, Uuid::new_v4().to_string()).await;
                match server_map_r {
                    Ok(server_map_arc) => {
                        let server_map = server_map_arc.lock().unwrap();
                        println!("{:?}", server_map)
                    }
                    Err(e) => {
                        println!("{}", e)
                    }
                }
            } else {
                println!("nodes error")
            }
        }
        None => {
            println!("load config failed")
        }
    }
}

While it's going to be "async", it won't be multi-threaded. I wouldn't be suprrised if the single-threaded scheduler leads to more deterministic in-order results like the one you're observing. You could try if specifying multi-threaded runtime for tokio::test makes a difference. See more on how to do that in the docs. By the way, I haven't actually read much of your code in detail.

2 Likes

It worked. By the way, do you have any examples of Tokio using thread pools in this scenario? I've tried many times and failed.The most recent attempt was as follows, it stuck and did not send the request.

async fn get_all_useful_server(
    nodes: Vec<String>,
    task_id: String,
) -> Result<Arc<Mutex<HashMap<String, String>>>, anyhow::Error> {
    let server_map = Arc::new(Mutex::new(HashMap::new()));
    let mut handles: Vec<JoinHandle<()>> = vec![];
    let pool = Builder::new_current_thread()
        .worker_threads(GET_SERVER_STATUS_THREAD_NUM)
        .enable_all()
        .build()
        .unwrap();
    for node in nodes {
        let map = server_map.clone();
        let id = task_id.clone();
        let handle = pool.spawn(insert_single_useful_server(map, id, node));
        handles.push(handle)
    }
    for h in handles {
        h.await;
    }
    Ok(server_map)
}

The problem is that you are creating a new runtime even though you already have one. An async fn should not create a runtime. It should just call tokio::spawn to spawn on the current runtime.

You can change your runtime to be multi-threaded with #[tokio::test(flavor = "multi_thread")]. See the documentation for more info.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.