Chunking a vec to use it with tokio spawn

Hello! I'm new with tokio and was trying to split a vec into chunks to consume it in async tasks. I have this function

async fn query_chunked(tokens: Vec<Address>, address: H160){
    let providers = vec![
        Arc::new(Provider::<Http>::try_from("https://rpc.mevblocker.io/fast").unwrap()),
        Arc::new(Provider::<Http>::try_from("https://rpc.mevblocker.io/fullprivacy").unwrap()),
        Arc::new(Provider::<Http>::try_from("https://eth.rpc.blxrbdn.com").unwrap()),
    ];

    let futures: Vec<_> = tokens.chunks(60).map(|token_chunk| tokio::spawn(
        query_balance_multiple(Arc::clone(&providers[0]), address, &token_chunk)
    ))
    .collect();

    for job in futures.into_iter() {
        job.await;
    }
}

but it fails at compilation with the following error

error[E0597]: `tokens` does not live long enough
  --> src/main.rs:27:27
   |
27 |     let futures: Vec<_> = tokens.chunks(60).map(|token_chunk| tokio::spawn(
   |                           ^^^^^^^^^^^^^^^^^ borrowed value does not live long enough
28 |         query_balance_multiple(Arc::clone(&providers[0]), address, &token_chunk)
   |         ------------------------------------------------------------------------ argument requires that `tokens` is borrowed for `'static`
...
36 | }
   | - `tokens` dropped here while still borrowed

I've tried to search for ways to do it but online I can't find any example. What would be the right way to do spawn the tasks to avoid the dropped variable? Thanks

The easiest solution is to just clone the chunks. If that's really expensive you can use an Arc. In your sample code you need to check the result from awaiting a JoinHandle.

Tried to clone and use arc but I have the same issue mentioning ``tokens escapes the function body here

Managed to get it working using a box and leaking it but would be great if anyone could point me a better way or if my approach is wrong

async fn query_chunked(tokens: Vec<Address>, address: H160){
    let providers = Arc::new(vec![
        Arc::new(Provider::<Http>::try_from("https://rpc.mevblocker.io/fast").unwrap()),
        Arc::new(Provider::<Http>::try_from("https://rpc.mevblocker.io/fullprivacy").unwrap()),
        Arc::new(Provider::<Http>::try_from("https://eth.rpc.blxrbdn.com").unwrap()),
        Arc::new(Provider::<Http>::try_from("https://1rpc.io/eth").unwrap()),
        Arc::new(Provider::<Http>::try_from("https://eth.llamarpc.com	").unwrap()),
    ]);
    let mm: &'static Vec<_> = Box::leak(Box::new(tokens));
    
    let mut tasks = vec![];
    let mut provider_idx = 0;
    for chunk in mm.chunks(100) {
        let provider = Arc::clone(&providers[provider_idx]);
        provider_idx =  (provider_idx + 1) % providers.len();
        let handle = tokio::spawn( async move {
            match query_balance_multiple(
                provider,
                address,
                &chunk,
            ).await {
                Ok(()) => {},
                Err(e) => {println!("Failed due to {:?}", e)},
            };
        });
        tasks.push(handle);
    }

    join_all(tasks).await;

}

Just put the Vec itself (or better yet, its contents) into an Arc and share that. Slice when needed.

pub async fn query_chunked(tokens: Vec<Address>, address: H160) {
    let providers = [
        Arc::new(Provider::<Http>::try_from("https://rpc.mevblocker.io/fast").unwrap()),
        Arc::new(Provider::<Http>::try_from("https://rpc.mevblocker.io/fullprivacy").unwrap()),
        Arc::new(Provider::<Http>::try_from("https://eth.rpc.blxrbdn.com").unwrap()),
        Arc::new(Provider::<Http>::try_from("https://1rpc.io/eth").unwrap()),
        Arc::new(Provider::<Http>::try_from("https://eth.llamarpc.com	").unwrap()),
    ];
    let tokens: Arc<[Address]> = tokens.into();
    let chunk_size = 100;
    let provider_iter = providers.into_iter().cycle();
    let chunk_iter = 0..(tokens.len() + chunk_size - 1) / chunk_size;
    
    let mut tasks = Vec::new();

    for (provider, chunk_idx) in provider_iter.zip(chunk_iter) {
        let tokens = tokens.clone();
        let start = chunk_idx * chunk_size;
        let end = tokens.len().min(start + chunk_size);
        let handle = tokio::spawn(async move {
            match query_balance_multiple(
                provider,
                address,
                &tokens[start..end],
            ).await {
                Ok(()) => {},
                Err(e) => {println!("Failed due to {:?}", e)},
            };
        });
        tasks.push(handle);
    }

    futures::future::join_all(tasks).await;
}

Playground

3 Likes

Thank you very much!!! Is much cleaner than before