I am learning Rust and converting an existing C# app into Rust as a learning experiment. In this app, I am iterating over a list of strings, for each string I am making n+1 async requests. I found a nice post on StackOverflow that demonstrated how to use the Stream to yield the futures to a Vec which works well with the n+1 requests. So If I make a single request to the n+1 executor then I get my responses nice an quickly. But when I try to iterate over the list of strings the app just hangs and there is no output, so obviously, my setup is wrong and I would love some guidance on how to better structure my app. I attempted to use the executor block in various other places as well as using the stream::unfold function higher up the chain but ran into more errors and made my head explode - taking on too much too fast I think.
There are probably many things that could be improved here, but at the moment my focus is on getting the process_all_regions function to complete. As mentioned above if I call the process_region function once then I get a response.
I have excluded the structs and some of the simple mapping functions for brevity.
async fn process_all_regions<'a>(mut file :File, display: std::path::Display<'a>) -> std::io::Result<()> {
let mut output: Vec<Details> = Vec::new();
for r in region_list().iter() {
let single_result = process_region(r.to_string()).await;
output = match single_result {
Some(r) => [output, r].concat(),
None => output.drain(..).collect()
};
}
let writable = serde_json::to_string(&output).unwrap_or("".to_string());
match file.write_all((&writable).as_bytes()) {
Err(why) => panic!("couldn't write to {}: {}", display, why),
Ok(_) => println!("successfully wrote to {}", display),
};
Ok(())
}
async fn process_region(region: String) -> Option<Vec<Details>> {
let r = Region::from_str(®ion).unwrap();
let client = Ec2Client::new(r);
executor::block_on(async {
let s = describe_instances(region, client);
let s = s.filter_map(|v| async move { v.ok() });
let s = s.filter_map(|v| async move { v });
let mut s = s.boxed_local();
s.next().await
})
}
fn describe_instances(region: String, ec2_client: Ec2Client) -> impl Stream<Item = DetailResult> {
let max_items = 25;
let ctx = Some(RequestContext {
client: ec2_client,
request: Some(get_instance_request(Some(max_items))),
region: region
});
stream::unfold(ctx, |ctx| async {
let rc = ctx.unwrap();
let c = rc.client.clone();
let response: Result<DescribeInstancesResult, RusotoError<DescribeInstancesError>> = c.describe_instances(rc.request?).await;
match response {
Ok(r) => {
let result = process_reservations(r.reservations, rc.region.clone());
if r.next_token.is_none() {
return Some((Ok(result), None));
}
let mut req = get_instance_request(Some(25));
req.next_token = r.next_token;
Some((Ok(result), Some(RequestContext {
client: rc.client,
request: Some(req),
region: rc.region
})))
},
Err(_) => None
}
})
}