I am new to rust. Below is the code I wrote. I want to make AWS S3 API calls in threads. (similar to threadpool in java)
async fn copy_folder_to_s3(
&self
) -> Result<(), Error> {
let files = utils::list_files_recursive(src_local_folder).unwrap;
let mut handles = vec![];
for file in files {
let src_abs_path = "/tmp/1.txt";
let src_prefix = "/tmp";
let cloud_url = "s3://bucket/prefix/";
let handle = thread::spawn(|| async { self.copy_to_s3(src_abs_path, cloud_url).await });
handles.push(handle);
}
for handle in handles {
handle.join();
}
return Ok(());
}
pub async fn copy_to_s3(&self, local_file: String, s3_file: String) {
let (bucket, key) = self.get_bucket_and_path(&s3_file).unwrap();
let body = ByteStream::from_path(Path::new(&local_file)).await;
let _ = self
.client
.put_object()
.bucket(bucket)
.key(key)
.body(body.expect("the local file is not present !"))
.send()
.await;
}
pub struct AwsUtilsStore {
client: s3::Client,
}
However this is resulting into errors
error[E0521]: borrowed data escapes outside of associated function
--> src/aws_utils.rs:119:26
|
107 | &self,
| -----
| |
| `self` is a reference that is only valid in the associated function body
| let's call the lifetime of this reference `'1`
...
119 | let handle = thread::spawn(|| async { self.copy_to_s3(src_abs_path, cloud_url).await });
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| |
| `self` escapes the associated function body here
| argument requires that `'1` must outlive `'static`
For more information about this error, try `rustc --explain E0521`.
How to fix the issue? I have a lot of helper functions associated with &self.
You could try replacing thread::spawn with thread::scope, which removes the 'static bound on the closure you pass to thread::spawn. This wouldn't fix the issue that a thread alone does nothing with a future though, see e.g. this section of the async book. You need an asynchronous executor/runtime to poll the future for you. Like tokio for example.
I only wrote it. But the issue is still there. Given that I am new to rust, and this is taking a huge time to grasp I thought Forums is better suited than Stackoverflow. If you don't mind can you give a working code?
I started off with futures::Threadpool ; and finally ended here.
I made the changes, went through the links but i still have the issue
let handle = thread::scope(|| async { self.copy_to_s3(src_abs_path, cloud_url).await });
Sorry, yes, that is just the change that will result in it trying to actually run your s3 operation (before it was only creating the task on the spawned thread and immediately returning, not running it). I apologize for the confusion.
This is unfortunately a bit messy still, async code is still fairly new to Rust and this is one of the many sharp edges it has.
You can get this to compile by simply removing the spawn and async block, which means the operations will run in sequence, but you clearly want to run them in parallel. You have to promise that everything you're borrowing, here self, will live longer than they will be used for.
Even though you are waiting for the tasks to finish the type system can't know that, and currently there's no equivalent to thread::scope so that it can. To solve that, you need to be able to ensure that you keep the thing you're calling alive as long as he task will run, which is easiest by wrapping it in an std::sync::Arc, a reference counted smart pointer that you can share across threads. You can either move the method you're calling to a local field that's wrapped in an Arc, or make the whole type you're in wrapped in an Arc and change the &self parameter to a self: &Arc<Self> parameter, which you will need to clone before the async move block, something like:
async fn copy_folder_to_s3(self: &Arc<Self>) -> Result<(), Error> {
...
for file in files {
...
let this = self.clone(); // cloning an Arc just increases the reference count
tokio::spawn(async move {
this.copy_to_s3().await;
});
}
}
...
This is also a handy pattern outside async code!
As a small note, you can also use JoinSet to make spawning and waiting for all of the tasks a little nicer, but it's more useful when you want to process the results as they complete out of order.
async fn copy_folder_to_s3(
self: &Arc<Self>,
src_local_folder: &str,
cloud_prefix: &str,
) -> Result<(), Error> {
let files = utils::list_files_recursive(src_local_folder).unwrap()
let mut handles = vec![];
for file in files {
...
let this = self.clone();
let handle =
tokio::spawn(async move { this.copy_to_s3(src_abs_path, cloud_url).await });
handles.push(handle);
}
tokio::join!(handles);
return Ok(());
}
Error
error[E0599]: the method `poll` exists for struct `Pin<&mut tokio::future::maybe_done::MaybeDone<Vec<tokio::task::JoinHandle<()>>>>`, but its trait bounds were not satisfied
--> src/aws_utils.rs:124:9
|
124 | tokio::join!(handles);
| ^^^^^^^^^^^^^^^^^^^^^ method cannot be called on `Pin<&mut tokio::future::maybe_done::MaybeDone<Vec<tokio::task::JoinHandle<()>>>>` due to unsatisfied trait bounds
|
So i found the below to work. let me know if there is a more elegant way to handle things?
for handle in handles{
handle.await;
}
I will run this and see if i get any runtime errors. Thank you for the help.
You can do that, or you can use the JoinSet I mentioned, or you could create that implementation it's talking about, but really select! and join! (and friends) are a meant as a lower level way to handle async, and they need a lot more care about pinning, which is a fairly confusing topic.
Don't feel bad about it taking time: async Rust is still one of the tougher bits of Rust, and Rust starts hard! There's a reason the website uses words like "fearless" and not "simple"
Can you explain the above? lets say i have to copy 10K files from s3 to local; in threads; wouldn't the above spin 10k threads first? which in itself will choke the OS? vurses limit the total threads to .. may be 10 and get the work done?
If yes, then can you give me the code with thread pool? I started off with futures::threadPool but it was way too complex for me to get things working with it?
Tokio is the threadpool, basically. You spawn 10k tasks, not threads, onto your tokio runtime which executes them in its internal threadpool. You can configure the number of threads with #[tokio::main] or spawn a Runtime directly, which allows you more freedom concerning configuration. Read more about it in the documentation of the tokio::runtime module.
If you still want to limit the number of tasks running concurrently for some reason you can use a Semaphore in tokio::sync - Rust
Generally you won't need to worry about this because an http client or the like further down in your tasks will already be limiting the number of simultaneous connections or whatever, but it can be handy.