Best approach for running iterations of future iterations

I am learning Rust and converting an existing C# app into Rust as a learning experiment. In this app, I am iterating over a list of strings, for each string I am making n+1 async requests. I found a nice post on StackOverflow that demonstrated how to use the Stream to yield the futures to a Vec which works well with the n+1 requests. So If I make a single request to the n+1 executor then I get my responses nice an quickly. But when I try to iterate over the list of strings the app just hangs and there is no output, so obviously, my setup is wrong and I would love some guidance on how to better structure my app. I attempted to use the executor block in various other places as well as using the stream::unfold function higher up the chain but ran into more errors and made my head explode :exploding_head: - taking on too much too fast I think.

There are probably many things that could be improved here, but at the moment my focus is on getting the process_all_regions function to complete. As mentioned above if I call the process_region function once then I get a response.

I have excluded the structs and some of the simple mapping functions for brevity.

async fn process_all_regions<'a>(mut file :File, display: std::path::Display<'a>) -> std::io::Result<()> {
    let mut output: Vec<Details> = Vec::new();
    for r in region_list().iter() {
        let single_result = process_region(r.to_string()).await;
        output = match single_result {
            Some(r) => [output, r].concat(),
            None => output.drain(..).collect()
        };
    }
    let writable = serde_json::to_string(&output).unwrap_or("".to_string());
    match file.write_all((&writable).as_bytes()) {
        Err(why) => panic!("couldn't write to {}: {}", display, why),
        Ok(_) => println!("successfully wrote to {}", display),
    };
    Ok(())
}

async fn process_region(region: String) -> Option<Vec<Details>> {
    let r = Region::from_str(&region).unwrap();
    let client = Ec2Client::new(r);
    executor::block_on(async {
        let s = describe_instances(region, client);
        let s = s.filter_map(|v| async move { v.ok() });
        let s = s.filter_map(|v| async move { v });
        let mut s = s.boxed_local();
        s.next().await
    })
}

fn describe_instances(region: String, ec2_client: Ec2Client) -> impl Stream<Item = DetailResult> {
    let max_items = 25;
    let ctx = Some(RequestContext {
        client: ec2_client,
        request: Some(get_instance_request(Some(max_items))),
        region: region
    });
    stream::unfold(ctx, |ctx| async {
        let rc = ctx.unwrap();
        let c = rc.client.clone();
        let response: Result<DescribeInstancesResult, RusotoError<DescribeInstancesError>> = c.describe_instances(rc.request?).await;
        match response {
            Ok(r) => {
                let result = process_reservations(r.reservations, rc.region.clone());
                if r.next_token.is_none() {
                    return Some((Ok(result), None));
                }
                let mut req = get_instance_request(Some(25));
                req.next_token = r.next_token;

                Some((Ok(result), Some(RequestContext {
                    client: rc.client,
                    request: Some(req),
                    region: rc.region
                })))
            },
            Err(_) => None
        }
    })
}

There's a lot going on, but one thing I notice immediately is the use of executor::block_on and std::fs::File inside async code. This is pretty much always wrong, and you can read about why here.

Instead of block_on, just await the code directly.

async fn process_region(region: String) -> Option<Vec<Details>> {
    let r = Region::from_str(&region).unwrap();
    let client = Ec2Client::new(r);
    let s = describe_instances(region, client);
    let s = s.filter_map(|v| async move { v.ok() });
    let s = s.filter_map(|v| async move { v });
    let mut s = s.boxed_local();
    s.next().await
}

Instead of std::fs::File, use an async file such as tokio::fs::File.

There are also various other things I notice such as the unneessary .boxed_local and the use of .next() on the stream rather than e.g. .collect().

1 Like

There's also this, which is a weird way of approaching it.

output = match single_result {
    Some(r) => [output, r].concat(),
    None => output.drain(..).collect()
};

You can simplify it to

if let Some(r) = single_result {
    output.push(r);
}

This avoids creating a new vector every time you add an item to it. Or if r is a vector, use extend rather than push.

I was getting an issue as the single_result is actually a vector, and it didn't like me using append method because the result wasn't mutable, so I hacked in a different solution to get some results in the short term. This is on the refactoring list :+1:

Right, you can use extend to add all elements from one vector to another.

1 Like

All good now, thank you :raised_hands:

App now using tokio File and write_all, extending the Vec and simplified the stream.

If you post it again, I may have more feedback.

Any feedback is welcome. I have pushed the code up to github so it should be a bit easier to view.

Many thanks

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.