How to perform threaded data collection and gather it all in a single vector

I have the following function performing serial collection of data from a http server:

pub fn perform_gflags_snapshot(
    hostname_port_vec: &Vec<&str>,
    snapshot_number: i32,
    yb_stats_directory: &PathBuf
) {
    let mut stored_gflags: Vec<StoredGFlags> = Vec::new();
    for hostname_port in hostname_port_vec {
        let detail_snapshot_time = Local::now();
        let gflags = read_gflags(&hostname_port);
        add_to_gflags_vector(gflags, hostname_port, detail_snapshot_time, &mut stored_gflags);
    }
    let current_snapshot_directory = &yb_stats_directory.join(&snapshot_number.to_string());
    let gflags_file = &current_snapshot_directory.join("gflags");
    let file = fs::OpenOptions::new()
        .create(true)
        .write(true)
        .open(&gflags_file)
        .unwrap_or_else(|e| {
            eprintln!("Fatal: error writing gflags data in snapshot directory {}: {}", &gflags_file.clone().into_os_string().into_string().unwrap(), e);
            process::exit(1);
        });
    let mut writer = csv::Writer::from_writer(file);
    for row in stored_gflags {
        writer.serialize(row).unwrap();
    }
    writer.flush().unwrap();
}

As simple as it sounds, I have a hard time figuring out how I can parallelise the read_gflags function, which goes out to an http server based on the loop of for hostname_port in hostname_port_vec.
I want to control the maximum number of parallellism using a variable.

The gflags variable returned by read_gflags returns ~ 400 rows per server in a vector with the following struct:

#[derive(Debug)]
pub struct GFlag {
    pub name: String,
    pub value: String,
}

And the add_to_gflags_vector essentially puts these in a vector with the following struct:

#[derive(Debug, Serialize, Deserialize)]
pub struct StoredGFlags {
    pub hostname_port: String,
    pub timestamp: DateTime<Local>,
    pub gflag_name: String,
    pub gflag_value: String,
}

Have you tried rayon?

If add_to_gflags_vector is just pushing a new value, with rayon that will be something like:

let stored_gflags: Vec<StoredGFlags> = hostname_port_vec
    .par_iter()
    .map(|hostname_port| { ... read gflags ... return a StoredGFlags })
    .collect();
2 Likes

I am looking for a generic way to be able to talk to multiple http endpoints at the same time, and get the struct or vector of structs back from the threads.

Some of my http endpoints can result in a relatively lot of data, and thus take some time.
So I want to perform that communication in parallel.
The moving of the data from Vec into Vec is not time taking and not really needing parallelism.

Therefore I am looking for a simple way to define a limited pool of threads, and call multiple http endpoints (the read_gflags() function) in the thread, and return the data that it resulted in: Vec<GFlags>, detail_snapshot_time, hostname_port, or have it put into Vec<StoredGFlags> and return that, needing all the vectors of the threads to be combined.

I cannot find anything that I think clearly describes this scenario, despite this being relatively simple and logical?

If you're looking for "how to do stuff in spawned threads and send data back to the main thread", there are examples in the relevant sections of the standard library, which are std::sync and std::thread. The gist of it is, "share memory by passing messages through channels".

The Book has a walkthrough of writing a basic thread pool. Rust By Example has some examples on threads and channels.. The crossbeam library is a good source of high-performance concurrency tools that are not found in std.

The point is that I see and understand what a lot of all the packages are saying. I get what you are saying, but this is just a little too alien for me to get my head around it.

The essence is I got a Vec<StoredGFlags>, which I want to parallellize some actions (reading http endpoints in my case) for, which do produce changes (additions) to that Vec<StoredGFlags>.

I saw a blogpost (https://nickymeuleman.netlify.app/blog/multithreading-rust) which returns the records in a vector, but that is using native threads, which is not limited to a number. Threading IMO should always be done in a reserved way, and therefore I am confident I need a ThreadPool, but then the spawn method generally seem to be swapped with execute, which does not seem to be able to return a handle with data?

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.