What is the most idiomatic way to return futures in `futures::stream.map()` that has reference to local variable?

I'm quite a newbie to Rust, and especially async programming in Rust. Please help inform me if I'm missing something here.

So I'm trying to make requests with a concurrency limit in the following code.

        let provider = Arc::new(Provider::<Http>::connect("").await);

        let start_block = 5800000;
        let end_block = 5900000;
        let iterations = (end_block - start_block) / 1000;
        let logs: Vec<_> = futures::stream::iter(0..iterations)
            .map(|i| {
                let begin = start_block + i * 1000;
                let end = begin + 1000;
                let filter = Filter::new()
                    .from_block(begin)
                    .to_block(end)
                    .event(TRANSFER_EVENT);
                provider.get_logs(&filter)
            })
            .buffered(5)
            .collect()
            .await;

The compiler then complains:

error[E0515]: cannot return value referencing local variable `filter`
   --> src\ether_utils.rs:169:17
    |
169 |                 provider.get_logs(filter.as_ref())
    |                 ^^^^^^^^^^^^^^^^^^---------------^
    |                 |                 |
    |                 |                 `filter` is borrowed here
    |                 returns a value referencing data owned by the current function

I think I understand the error to some extent, and have come up with a workaround by rewriting the code:

        let provider = Arc::new(Provider::<Http>::connect("").await);

        let start_block = 5800000;
        let end_block = 5900000;
        let iterations = (end_block - start_block) / 1000;
        let filters: Vec<Filter> = (0..iterations)
            .map(|i| {
                let begin = start_block + i * 1000;
                let end = begin + 1000;
                // let provider = provider.clone();
                Filter::new()
                    // .at_block_hash(tx.block_hash.unwrap())
                    .from_block(begin)
                    .to_block(end)
                    .event(TRANSFER_EVENT)
            })
            .collect();
        let logs: Vec<_> = futures::stream::iter(0..iterations)
            .map(|i| provider.get_logs(&filters[i]))
            .buffered(5)
            .collect()
            .await;

Now I feel the code looks a little off to me, as I'm forced to move the filter variables out of the stream.map() function to satisfy the lifetime check. Is there anyway to make it look more straight-forward, like converting the provider.get_logs(&filters[i]) to contain a "owned" filter instead of reference?

The idiomatic and only way in Rust is to store the owned type somewhere, and then use the reference to it.

Your code follows the pattern that a value of owned type is created in the closure in .map (i.e. a small scope), and it's referenced as a return value which Rust forbids. Rust Playground

So the workaround is actually the exact pattern Rust code adopts. (But I'm not saying the workaround you provided is the best, since no full code is shown.)

Thanks for the reply! The code is actually the full code and the functions called are from ethers-rs library, whose api is not under my control.

I understand that you cannot pass local references to outer scope. To think about it, the real issue seems to be that I need to return a future that contains an owned version of Filter rather than the reference to it, yet the provider.get_logs uses a reference.

Below is my attempt of using async closure to build the future that owns the filter variable. Looks a little hacky and feels there should be a better way to write it.

        let logs: Vec<_> = futures::stream::iter(0..iterations)
            .map(|i| {
                let begin = start_block + i * 1000;
                let end = begin + 1000;
                let provider = provider.clone();
                let filter = Filter::new()
                    .from_block(begin)
                    .to_block(end)
                    .event(TRANSFER_EVENT);
                (async move || provider.get_logs(&filter).await)()
            })
            .buffered(5)
            .collect()
            .await;

After looking through the Filter and provider.get_logs apis, your code can be refactored to

        let logs: Vec<_> = futures::stream::iter(0..iterations)
            .map(|i| async move {
                let begin = start_block + i * 1000;
                let end = begin + 1000;
                let provider = provider.clone();
                let filter = Filter::new()
                    .from_block(begin)
                    .to_block(end)
                    .event(TRANSFER_EVENT);
                provider.get_logs(&filter).await
            })
            .buffered(5)
            .collect()
            .await;

Note the type returned from the closure is owned instead of referenced. A simple snippet to reproduce the problem and address it: Rust Playground

1 Like

Thanks for the time, that looks great!

I'd like to also mention that to make the original code compile, I had to add the line let provider_ref = &provider before the block of code and update let provider = provider.clone() to let provider = provider_ref.clone() in order for it to compile. Else it would complain about provider moved multiple times.

Reasoning about closures has been a headache to me. I certainly have gained a little deeper understanding out of this thread :slight_smile:

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.