JoinAll and async/await

/// Asyncronously splits the number of requests to maximize the volume of traffic
async fn get_data_async(
    length: u32,
) -> Result<Vec<u8>, HyxeError> {
    let mut futures0 = vec![];
    let mut iter_count = 0;

    if length < MAX_BLOCK_SIZE {
        futures0.push(get_raw_data_async(length as usize, 0));
        iter_count += 1;
    } else {
        let mut amt_left = length;
        while amt_left > MAX_BLOCK_SIZE {
            futures0.push(get_raw_data_async(MAX_BLOCK_SIZE as usize, 0));
            amt_left -= MAX_BLOCK_SIZE;
            iter_count += 1;
        }

        if amt_left >= 1 {
            futures0.push(get_raw_data_async(amt_left as usize, 0));
            iter_count += 1;
        }
    }

    if iter_count != futures0.len() {
        return HyxeError::throw("[QuantumRandom] unable to setup asynchronous split streams");
    }
    
    
    futures::future::join_all(futures0).await.and_then(move |(ve0, ve1, ve2)| {
        let mut ret_vec = Vec::<u8>::new();

        println!("[QuantumRandom] Joined all 3 futures...");
        if ve0.is_empty() {
            eprintln!("0 vector size");
            return HyxeError::throw("Invalid set");
        }

        if let Some(ve0) = &ve0[0] {
            for byte in ve0.iter() {
                ret_vec.push(*byte);
            }
            if iter_count > 1 {
                for ve11 in ve1.iter() {
                    if let Some(ve1) = ve11 {
                        for byte in ve1.iter() {
                            ret_vec.push(*byte);
                        }
                    }
                }

                for ve22 in ve2.iter() {
                    if let Some(ve2) = ve22 {
                        for byte in ve2.iter() {
                            ret_vec.push(*byte);
                        }
                    }
                }
            }
        }

        Ok(ret_vec)
    })
}

This code does not work. How can I await on a JoinAll?

If you await the join_all future, you will just have a vector. You can just loop over it. If you don't await it, you can use and_then, but it's all together less elegant.

ps: If you want help, I would say it's best to be more specific than just "it doesn't work", eg. what error messages the compiler gives you etc.

1 Like

If I understand your code well, you try to create a vector of bytes from several vectors of bytes. Note that there are simpler solutions for appending vectors, as described here.

So, I can call iter() on the Vec structure (as one example), and for each element call .await. Wouldn't that wait for it to finish and then it would loop to the next, or... ?

/// Asyncronously splits the number of requests to maximize the volume of traffic
async fn get_data_async(
    length: u32,
) -> Result<Vec<u8>, HyxeError> {
    let mut futures0 = vec![];
    let mut iter_count = 0;

    if length < MAX_BLOCK_SIZE {
        futures0.push(get_raw_data_async(length as usize, 0));
        iter_count += 1;
    } else {
        let mut amt_left = length;
        while amt_left > MAX_BLOCK_SIZE {
            futures0.push(get_raw_data_async(MAX_BLOCK_SIZE as usize, 0));
            amt_left -= MAX_BLOCK_SIZE;
            iter_count += 1;
        }

        if amt_left >= 1 {
            futures0.push(get_raw_data_async(amt_left as usize, 0));
            iter_count += 1;
        }
    }

    if iter_count != futures0.len() {
        return HyxeError::throw("[QuantumRandom] unable to setup asynchronous split streams");
    }

    
    Ok(futures0.into_iter().map(|fut| async {
        fut.await.map(|res| res).map_err(|err| ())
    }).into_iter().concat().collect())
}

Here is the new code. However, I get an error underlined on the "concat" method that says:

"no method for named "concat" found for type `std::iter::Map<std::vec::IntoIter<impl core::future::future::Future"

Then it says, note:
"The method concat exists, but the following trait bound were not satisfied:"

"&mut std::iter::Map<std::vec::IntoIter> : futures_util::stream::StreamExt"

[...]

Resolved via good 'ol fiddling

/// Asyncronously splits the number of requests to maximize the volume of traffic
async fn get_data_async(
    length: u32,
) -> Result<Vec<u8>, HyxeError> {
    let mut futures0 = vec![];
    let mut iter_count = 0;

    if length < MAX_BLOCK_SIZE {
        futures0.push(get_raw_data_async(length as usize, 0));
        iter_count += 1;
    } else {
        let mut amt_left = length;
        while amt_left > MAX_BLOCK_SIZE {
            futures0.push(get_raw_data_async(MAX_BLOCK_SIZE as usize, 0));
            amt_left -= MAX_BLOCK_SIZE;
            iter_count += 1;
        }

        if amt_left >= 1 {
            futures0.push(get_raw_data_async(amt_left as usize, 0));
            iter_count += 1;
        }
    }

    if iter_count != futures0.len() {
        return HyxeError::throw("[QuantumRandom] unable to setup asynchronous split streams");
    }

    Ok(futures::future::join_all(futures0).map(|arr| {
        arr.into_iter().map(|res| res.unwrap_or(Vec::new()))
    }).map(|res| res.map(|res| res).into_iter().flatten().collect::<Vec<u8>>()).await)
}

Im glad you got it working. However your code is very much futures 0.1 -ish. You should be able to do something like:

let vec = join_all( futures0 ).await;

let out = vec
   .into_iter()
   .map( |res| res.unwrap_or(Vec::new())
   .flatten()
   .collect::<Vec<u8>>()
;

Ok(out)

Or to my taste even better:

let     data          = join_all( futures0 ).await ;
let mut out : Vec<u8> = Vec::new()                 ;
    
for chunk in data { out.extend(chunk) }

Ok(out)

Instead of extend you can also use append.

The first part of the code also seems to have some strange logic. If you augment iter_count everytime you call futures0.push(_), it will always be equal to futures0.len(), so the if block below doesn't test anything and will never get executed.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.