How to call multiple async functions in several threads?

I am new to rust. Below is the code I wrote. I want to make AWS S3 API calls in threads. (similar to threadpool in java)

  async fn copy_folder_to_s3(
        &self
    ) -> Result<(), Error> {
        let files = utils::list_files_recursive(src_local_folder).unwrap;
        let mut handles = vec![];
        for file in files {
            let src_abs_path = "/tmp/1.txt";
            let src_prefix = "/tmp";
            let cloud_url = "s3://bucket/prefix/";
            let handle = thread::spawn(|| async { self.copy_to_s3(src_abs_path, cloud_url).await });
            handles.push(handle);
        }
        for handle in handles {
            handle.join();
        }
        return Ok(());
    }

    pub async fn copy_to_s3(&self, local_file: String, s3_file: String) {
        let (bucket, key) = self.get_bucket_and_path(&s3_file).unwrap();
        let body = ByteStream::from_path(Path::new(&local_file)).await;
          let _ = self
            .client
            .put_object()
            .bucket(bucket)
            .key(key)
            .body(body.expect("the local file is not present !"))
            .send()
            .await;
}

pub struct AwsUtilsStore {
    client: s3::Client,
}

However this is resulting into errors

error[E0521]: borrowed data escapes outside of associated function
   --> src/aws_utils.rs:119:26
    |
107 |         &self,
    |         -----
    |         |
    |         `self` is a reference that is only valid in the associated function body
    |         let's call the lifetime of this reference `'1`
...
119 |             let handle = thread::spawn(|| async { self.copy_to_s3(src_abs_path, cloud_url).await });
    |                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    |                          |
    |                          `self` escapes the associated function body here
    |                          argument requires that `'1` must outlive `'static`

For more information about this error, try `rustc --explain E0521`.

How to fix the issue? I have a lot of helper functions associated with &self.

This looks sort of similar to this recent SO question: rust error: borrowed value does not live long enough - Stack Overflow, have I discovered another cross posting? :male_detective:

You could try replacing thread::spawn with thread::scope, which removes the 'static bound on the closure you pass to thread::spawn. This wouldn't fix the issue that a thread alone does nothing with a future though, see e.g. this section of the async book. You need an asynchronous executor/runtime to poll the future for you. Like tokio for example.

I only wrote it. But the issue is still there. Given that I am new to rust, and this is taking a huge time to grasp I thought Forums is better suited than Stackoverflow. If you don't mind can you give a working code?

I started off with futures::Threadpool ; and finally ended here.

I made the changes, went through the links but i still have the issue

let handle = thread::scope(|| async { self.copy_to_s3(src_abs_path, cloud_url).await });

issue

  let handle = thread::scope(|| async { self.copy_to_s3(src_abs_path, cloud_url).await });
    |                          ^^^^^^^^^^^^^ -- takes 0 arguments

not sure what 0 arguments mean

The tldr is add the tokio crate and switch to tokio::spawn instead of thread spawn. (That link includes a full example)

i changed the implementation to this

 for file in files {
            let src_abs_path = format!("{:?}", file.canonicalize().unwrap());
            let src_prefix = src_abs_path.split(src_local_folder.clone()).nth(1).unwrap();
            let cloud_url = format!("{}/{}/{:?}", cloud_prefix, src_prefix, file.file_name());
            let handle =
                tokio::spawn(async move { 
                    self.copy_to_s3(src_abs_path, cloud_url).await 
                });
            // handles.push(handle);
        }

and the issue still exists. I am trying to keep copy_to_s3 intact and still get it working. if this is not the way its supposed to be ... let me know.

 tokio::spawn(async move { 
122 | |                     self.copy_to_s3(src_abs_path, cloud_url).await 
123 | |                 });
    | |                  ^
    | |                  |
    | |__________________`self` escapes the associated function body here
    |                    argument requires that `'1` must outlive `'static`

Sorry, yes, that is just the change that will result in it trying to actually run your s3 operation (before it was only creating the task on the spawned thread and immediately returning, not running it). I apologize for the confusion.

This is unfortunately a bit messy still, async code is still fairly new to Rust and this is one of the many sharp edges it has.

You can get this to compile by simply removing the spawn and async block, which means the operations will run in sequence, but you clearly want to run them in parallel. You have to promise that everything you're borrowing, here self, will live longer than they will be used for.

Even though you are waiting for the tasks to finish the type system can't know that, and currently there's no equivalent to thread::scope so that it can. To solve that, you need to be able to ensure that you keep the thing you're calling alive as long as he task will run, which is easiest by wrapping it in an std::sync::Arc, a reference counted smart pointer that you can share across threads. You can either move the method you're calling to a local field that's wrapped in an Arc, or make the whole type you're in wrapped in an Arc and change the &self parameter to a self: &Arc<Self> parameter, which you will need to clone before the async move block, something like:

async fn copy_folder_to_s3(self: &Arc<Self>) -> Result<(), Error> {
  ...
  for file in files {
    ...
    let this = self.clone(); // cloning an Arc just increases the reference count
    tokio::spawn(async move {
      this.copy_to_s3().await;
    });
  }
}

...

This is also a handy pattern outside async code!


As a small note, you can also use JoinSet to make spawning and waiting for all of the tasks a little nicer, but it's more useful when you want to process the results as they complete out of order.

2 Likes

Hi,

it worked. but I am unable to join the Futures.

  async fn copy_folder_to_s3(
        self: &Arc<Self>,
        src_local_folder: &str,
        cloud_prefix: &str,
    ) -> Result<(), Error> {
        let files = utils::list_files_recursive(src_local_folder).unwrap()

        let mut handles = vec![];
        for file in files {
          ...
            let this = self.clone();
            let handle =
                tokio::spawn(async move { this.copy_to_s3(src_abs_path, cloud_url).await });
            handles.push(handle);
        }
        tokio::join!(handles);
        return Ok(());
    }

Error


error[E0599]: the method `poll` exists for struct `Pin<&mut tokio::future::maybe_done::MaybeDone<Vec<tokio::task::JoinHandle<()>>>>`, but its trait bounds were not satisfied
   --> src/aws_utils.rs:124:9
    |
124 |         tokio::join!(handles);
    |         ^^^^^^^^^^^^^^^^^^^^^ method cannot be called on `Pin<&mut tokio::future::maybe_done::MaybeDone<Vec<tokio::task::JoinHandle<()>>>>` due to unsatisfied trait bounds
    |

So i found the below to work. let me know if there is a more elegant way to handle things?

 for handle in handles{
            handle.await;
        }

I will run this and see if i get any runtime errors. Thank you for the help.

You can do that, or you can use the JoinSet I mentioned, or you could create that implementation it's talking about, but really select! and join! (and friends) are a meant as a lower level way to handle async, and they need a lot more care about pinning, which is a fairly confusing topic.

I went with for loop. At this stage, I am lagging way a lot in this project. So.

Anyways thank you for the help. it was great.

Don't feel bad about it taking time: async Rust is still one of the tougher bits of Rust, and Rust starts hard! There's a reason the website uses words like "fearless" and not "simple"

1 Like

Can you explain the above? lets say i have to copy 10K files from s3 to local; in threads; wouldn't the above spin 10k threads first? which in itself will choke the OS? vurses limit the total threads to .. may be 10 and get the work done?

If yes, then can you give me the code with thread pool? I started off with futures::threadPool but it was way too complex for me to get things working with it?

Tokio is the threadpool, basically. You spawn 10k tasks, not threads, onto your tokio runtime which executes them in its internal threadpool. You can configure the number of threads with #[tokio::main] or spawn a Runtime directly, which allows you more freedom concerning configuration. Read more about it in the documentation of the tokio::runtime module.

1 Like

wow! that helps. thank you once again.

If you still want to limit the number of tasks running concurrently for some reason you can use a Semaphore in tokio::sync - Rust

Generally you won't need to worry about this because an http client or the like further down in your tasks will already be limiting the number of simultaneous connections or whatever, but it can be handy.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.