I'm very interested in building high performance Rust applications, especially using multi-threading. Tokio makes this very easy, by spinning up the same number of threads as the cores available to the system.
However, in order to make use of this async + multi-threading capability, I find it useful to use the join_all macro from the futures crate. I'm surprised that Tokio doesn't offer this feature directly.
I found a post that suggests using a JoinSet from Tokio, but I failed to use this correctly. The code snippet below provides the following error message:
<impl futures::Future as futures::Future>::Output cannot be sent between threads safely
the trait std::marker::Send is not implemented for <impl futures::Future as futures::Future>::Output
All I want to do is create a bunch of Futures, and then pause execution until all of them have finished. Does Tokio have a way of doing this natively, or am I stuck adding the futures dependency?
let futures = s3::create_objects().await; // this returns Vec<impl Future>
let join_set = JoinSet::new();
for future in futures {
join_set.spawn(future);
}
Edit:
I just noticed that there's a LocalSet as well, which specifically says it's designed for non-send futures. However, this isn't a requirement when I use futures::task::join_all. I'm still looking for a Tokio-native equivalent to join_all. The LocalSet docs indicate that all the futures must run on the same thread, which isn't the desired behavior.
I don't get what the problem is. If futures (which is runtime-agnostic) has some piece of functionality, then why would every individual runtime re-implement it?
The easiest way to do something equivalent to join_all is this:
let handles = Vec::new();
for task in tasks {
handles.push(tokio::spawn(your_fn(task)));
}
let output = Vec::new();
for handle in handles {
output.push(handle.await.unwrap());
}
Which non-Send type is causing that error? You should probably just get rid of the non-Send type.
Yeah I think you're right. I'm getting the non-Send type from the AWS SDK for Rust. I'm not sure why that type isn't Send, but I'll have to work around that.
Here's my async function that creates the Vec<impl Future>, creates all the Futures, and then returns the Vec.
pub async fn create_objects() -> Vec<impl Future> {
let mut future_list = vec![]; // Initialize empty Vec<T> to hold a bunch of Futures
let aws_cfg = get_aws_cfg().await; // This is another helper function I created, which just retrieves an aws_config::SdkConfig
let s3_client = s3::Client::new(&aws_cfg);
for _ in 1..=10 {
// Randomly generate UUID for S3 object name (and body / contents)
let mut key = uuid::Uuid::new_v4().to_string();
key.push_str(".txt");
let body = ByteStream::new(SdkBody::from(key.clone()));
// Add a new Future to the Vec<T>
future_list.push(
s3_client.put_object()
.bucket("trevor-rust-usw1")
.key(&key)
.body(body)
.send()
);
}
// Return the entire Vec<T>, containing all the un-polled Futures
return future_list;
}
My function is returning Vec<impl Future>, which is limiting the view of the underlying struct as a Future, which isn't Send.
I think I need to return Vec<Future<Output=Result<PutObjectOutput, SdkError<PutObjectError, HttpResponse>>>, but when I do that, I get this error:
the size for values of type dyn Future<Output = Result<PutObjectOutput, SdkError<PutObjectError, Response>>> cannot be known at compilation time
the full name for the type has been written to 'C:\git\sg-sqs-app\target\debug\deps\sg_sqs_app-5b2c2a9e10987857.long-type-1088543949375940910.txt'
consider using --verbose to print the full type name to the console
the trait Sized is not implemented for dyn Future<Output = Result<PutObjectOutput, SdkError<PutObjectError, Response>>>
I'm guessing you also got this somewhere deep down the avalanche of errors?
error[E0782]: trait objects must include the `dyn` keyword
Assuming you didn't mean to need to type erase the future,[1] that's the real error, and the pages of Sized related errors is just fallout from the compiler trying to soldier on with an incorrect assumption of what you meant.
In which case maybe you meant
// vvvv
Vec<impl Future<Output = ....
If you did mean to type erase the future, you can try...
Vec<Pin<Box<
dyn Future<Output=...>
+ Send // auto-traits will need to be specified
>>>
And you'll have to Box::pin(...) the futures.
(If the problem is indeed deeper, it will probably remain though; these suggestions are just about resolving the Sized errors.)
@quinedot I did some reading online, and figured out the Box & Pin thing.
I was able to get it working with the implementation below .... I think. It compiles and runs. I'm just not sure if I'm getting the desired multi-threading with tokio yet. I need to add some debugging code to check thread IDs.
Here's the function that creates the Vec of Future.
pub async fn create_objects() -> Vec<Pin<Box<impl Future<Output=Result<PutObjectOutput, SdkError<PutObjectError, HttpResponse>>>>>>
{
let mut future_list = vec![];
let aws_cfg = get_aws_cfg().await;
let s3_client = s3::Client::new(&aws_cfg);
for _ in 1..=5 {
let mut key = uuid::Uuid::new_v4().to_string();
key.push_str(".txt");
let body = ByteStream::new(SdkBody::from(key.clone()));
future_list.push(
Box::pin(
s3_client.put_object()
.bucket("trevor-rust-usw1")
.key(&key)
.body(body)
.send()
)
);
}
return future_list;
}
Here's the code that calls the Future generator function, and spawns them with tokio.
let mut futures = s3::create_objects().await;
let mut join_handle_list = vec![];
for future in futures {
let join_handle_new = tokio::task::spawn(future);
join_handle_list.push(join_handle_new);
}
let result = join_all(join_handle_list).await;
Tokio has single-threaded and multi-threaded runtimes; which one you get is controlled by the presence of the rt-multi-thread feature flag.
Reasonable, yes; idiomatic, no. You are doing a lot of mutation and pushing and allocation and explicit returning that's not needed, and your code formatting is unconventional.