Pass `async` function to `Iterator::map`?

async fn maximum(default: Thing, responses: Vec<Result<Response, ThingError>>) -> Result<Thing, reqwest::Error> {
  let resps : (u8, Thing) = responses.into_iter()
    .filter_map(|result| result.ok())
    .filter(|result | result.status() == StatusCode::OK)
    .map( |resp| Thing::try_parse_response(resp.bytes()))  // <<<<< resp.bytes() is `async`
    .filter_map(|result : Result<Thing, ThingError> | result.ok())
    .fold((0, default), |(num, a):(u8,Thing), b|  (num + 1, if a < b { b } else { a }));
  match resps {
    (num_responses, max) if num_responses < 2 => Err(NotEnoughResponses(num_responses, max)),
    (num_responses, max) if num_responses >= 2 => Ok(max),
    _ => panic!("How is x < 2 || x >= 2 not exhaustive?")
  }
}

In the above snippet, several HTTP requests were made and this function is being used to parse the results and find the maximum Thing. This is being called from an async function already, but it's not clear to me how to pass an async function to Iterator::map.

I figured that if it's not possible to pass an async function in where Iterator is not expecting one, then the next best thing would be to try to await the response bytes prior to forming an iterator. However, given the variable number of responses, I'd have to do something pretty ugly to resolve them.

let mut bodies = Vec::with_capacity(responses.len());
for i in 0..responses.len() {
  bodies.push(responses[i].await);
}

And then zip the two into an iterator of (Response, Body) or something like that.

Seems my plan to resolve the bodies and zip them with the Responses will is even uglier than I thought, because Response::bytes consumes the response. So I'd have to make two parallel vectors, one with the response statuses, which I extract first because Response::status is non-consuming, and one with the response bodies, which must be constructed second since it will consume the response.

       let statuses = responses.iter().map(|result| match result.as_ref() {
            Ok(response) => response.status(),
            Err(_) => StatusCode::INTERNAL_SERVER_ERROR
        }).collect::<Vec<StatusCode>>();


        let mut bodies = Vec::with_capacity(responses.len());
        let mut riter = responses.into_iter();
        loop {
            match riter.next() {
                Some(result) => match result {
                    Ok(response) => bodies.push(response.bytes().await),
                    Err(_) => bodies.push(Ok(Bytes::new()))
                }
                None => break
            }
        }

        let resps: (u8, Thing) = statuses.into_iter().zip(bodies.into_iter())
             .... (rest of the method from above, but adapted to pairs)

You should be using the Stream trait instead. You can convert an iterator into a stream with iter and if you import the extension trait StreamExt, you will have various methods such as .then available on your stream. The stream .then combinator is like .map, except you return an async block in the closure.

6 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.