Trying to run a vector of futures from inside a struct

Hello,

I have been experimenting lately with async / await model and trying different patterns. I have the follow code that I want to run.

use futures::StreamExt;

pub struct Exec;

impl Exec {
    pub async fn do_one(&self, i: i32) -> i32 {
        i * i
    }

    pub async fn do_all(&self) {
        let mut futures = Vec::new();
        
        for i in 0..100 {
            futures.push(self.do_one(i));
        }
        
        futures::stream::iter(futures).for_each_concurrent(Some(4), |f| async move {
            println!("{}", tokio::spawn(f).await.unwrap());
        }).await;
    }
    
}

#[tokio::main]
async fn main() {
    let exec = Exec{};
    
    exec.do_all().await;
}

Basically I am trying to run futures in parallel by spawning a new tokio task. However I am hitting an error that i fail to understand.

error[E0521]: borrowed data escapes outside of associated function
  --> src/main.rs:14:26
   |
10 |     pub async fn do_all(&self) {
   |                         -----
   |                         |
   |                         `self` is a reference that is only valid in the associated function body
   |                         let's call the lifetime of this reference `'1`
...
14 |             futures.push(self.do_one(i));
   |                          ^^^^^^^^^^^^^^
   |                          |
   |                          `self` escapes the associated function body here
   |                          argument requires that `'1` must outlive `'static`

It seems to suggest that my futures are outliving the function when they seem like they don't. Since I await on every task concurrently and also await for all the futures to execute how can make this code work ?

Thanks !

Here a link to the relevant section in the documentation: async/await - Asynchronous Programming in Rust

This means that the future returned from an async fn must be .awaited while its non-'static arguments are still valid. In the common case of .awaiting the future immediately after calling the function (as in foo(&x).await) this is not an issue. However, if storing the future or sending it over to another task or thread, this may be an issue.

When you (needlessly) spawn a tokio task with tokio::spawn(f) you automatically require your future to be 'static, which it isn't. It is only valid for the lifetime of your Exec struct, because you provide it as a reference to your do_one method.

I see two easy ways to make your code run:

  1. do_one does not need access to self, so make it a function instead of a method. This way the returned futures will be valid for 'static, because they take full ownership of every argument.
  2. don't use tokio::spawn you are in the context of tokio anyway, so you can just call f.await, instead of tokio::spawn(f).await

First solution:

use futures::StreamExt;

pub struct Exec;

impl Exec {
    pub async fn do_one(i: i32) -> i32 {
        i * i
    }

    pub async fn do_all(&self) {
        let mut futures = Vec::new();
        
        for i in 0..100 {
            futures.push(Self::do_one(i));
        }
        
        futures::stream::iter(futures).for_each_concurrent(Some(4), |f| async move {
            println!("{}", tokio::spawn(f).await.unwrap());
        }).await;
    }
    
}

#[tokio::main]
async fn main() {
    let exec = Exec{};
    
    exec.do_all().await;
}

Playground.

Second solution:

use futures::StreamExt;

pub struct Exec;

impl Exec {
    pub async fn do_one(&self, i: i32) -> i32 {
        i * i
    }

    pub async fn do_all(&self) {
        let mut futures = Vec::new();
        
        for i in 0..100 {
            futures.push(self.do_one(i));
        }
        
        futures::stream::iter(futures).for_each_concurrent(Some(4), |f| async move {
            println!("{}", f.await);
        }).await;
    }
    
}

#[tokio::main]
async fn main() {
    let exec = Exec{};
    
    exec.do_all().await;
}

Playground.

2 Likes

Every Future from async fn holds all of function's arguments, because the async functions do not run when you call them. They only copy arguments to a Future struct, and wait until they're polled.

Because you have &self in async fn, the temporary loan of self becomes part of its Future, and this Future<'self> is itself temporary and valid only inside the function that called it.

However, spawn requires futures to be completely standalone, independent, and not borrow anything temporary. spawn() by design forbids async fn(&self).

You should probably avoid calling spawn. You don't need it. Futures inside the stream will be polled anyway. Just use f.await.

If you want to spawn anyway, then you must not use &self or any borrowed arguments in the async function.

3 Likes

Thanks for your response indeed it is more clear I was just wondering if there was a way to use lifetimes to make it clear that in this case the futures will never outlive the function but I guess with thread / tasks that's not possible ?

Also maybe I was not clear when describing the issue or am also wrong here. I spawn a task because I want the futures to run in parallel (in their own thread) but only 4 at a time at most.
I belive that just using for_each_concurrent will still use polling right ?

Thanks you for your response. I didn't mention it but I also tried to make a clone of the struct and calling the function from the clone but I also couldn't make it work. I assume with your response this will also not work because no matter what the function always borrows self ?

Having it as a static function that take ownership of the copied self should work then.

Regarding the use of spawn as I said to jofas I want the futures to run in parallel while limitting the parallelism. Something like a thread pool.

Yes, correct, that is not possible. If you look at the signature of tokio::spawn, you see that the future you pass as arguement needs to implment Future + Send + 'static. The + 'static requires your future to not contain any references that do not satisfy the 'static lifetime. See here: Static - Rust By Example. Your futures do not, because they contain a reference to self, which only lives for an unspecified lifetime 'a, so your futures implement Future + Send + 'a.

Yes, that'd work. If your type is hard to copy you could wrap it in an Arc pointer as well. Arc is sendable and cheap to clone.

1 Like

Cloning &self gives you &self again (references are Copy and trivially cloneable without changes).

Cloning for &str -> String is a special case. When you have types with nested references like a Future, then you get the same type back.

You'd have to use self: Arc<Self> instead of &self. There's also another workaround:

fn not_async(&self, i: i32) -> impl Future<Output=i32> {
   async move { i * i }
}

This makes a Future from the async {} block, and if the block doesn't use &self, then it won't borrow it.

3 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.