Why doesn't Tokio have a join_all macro?

I'm very interested in building high performance Rust applications, especially using multi-threading. Tokio makes this very easy, by spinning up the same number of threads as the cores available to the system.

However, in order to make use of this async + multi-threading capability, I find it useful to use the join_all macro from the futures crate. I'm surprised that Tokio doesn't offer this feature directly.

I found a post that suggests using a JoinSet from Tokio, but I failed to use this correctly. The code snippet below provides the following error message:

<impl futures::Future as futures::Future>::Output cannot be sent between threads safely
the trait std::marker::Send is not implemented for <impl futures::Future as futures::Future>::Output

All I want to do is create a bunch of Futures, and then pause execution until all of them have finished. Does Tokio have a way of doing this natively, or am I stuck adding the futures dependency?

    let futures = s3::create_objects().await; // this returns Vec<impl Future>
    let join_set = JoinSet::new();

    for future in futures {
        join_set.spawn(future);
    }

Edit:

I just noticed that there's a LocalSet as well, which specifically says it's designed for non-send futures. However, this isn't a requirement when I use futures::task::join_all. I'm still looking for a Tokio-native equivalent to join_all. The LocalSet docs indicate that all the futures must run on the same thread, which isn't the desired behavior.

I don't get what the problem is. If futures (which is runtime-agnostic) has some piece of functionality, then why would every individual runtime re-implement it?

2 Likes

The easiest way to do something equivalent to join_all is this:

let handles = Vec::new();
for task in tasks {
    handles.push(tokio::spawn(your_fn(task)));
}
let output = Vec::new();
for handle in handles {
    output.push(handle.await.unwrap());
}

Which non-Send type is causing that error? You should probably just get rid of the non-Send type.

LocalSet is probably not what you're looking for.

Yeah I think you're right. I'm getting the non-Send type from the AWS SDK for Rust. I'm not sure why that type isn't Send, but I'll have to work around that.

That doesn't sound right. Can you share which specific aws type is the problem?

1 Like

Yeah it's the output from this function:

So it's a Future, wrapping Result<PutObjectOutput, SdkError<PutObjectError, HttpResponse>>.

For some reason, that Result doesn't seem to implement Send, which is making the async stuff (like tokio::spawn()) break.

Here's my async function that creates the Vec<impl Future>, creates all the Futures, and then returns the Vec.

pub async fn create_objects() -> Vec<impl Future> {  
  let mut future_list = vec![]; // Initialize empty Vec<T> to hold a bunch of Futures

  let aws_cfg = get_aws_cfg().await; // This is another helper function I created, which just retrieves an aws_config::SdkConfig
  let s3_client = s3::Client::new(&aws_cfg);
  
  for _ in 1..=10 {

    // Randomly generate UUID for S3 object name (and body / contents)
    let mut key = uuid::Uuid::new_v4().to_string();
    key.push_str(".txt");      
    let body = ByteStream::new(SdkBody::from(key.clone()));

    // Add a new Future to the Vec<T>
    future_list.push(
      s3_client.put_object()
        .bucket("trevor-rust-usw1")
        .key(&key)
        .body(body)
        .send()
    ); 
  }

  // Return the entire Vec<T>, containing all the un-polled Futures
  return future_list;
}

I think I see the issue.

My function is returning Vec<impl Future>, which is limiting the view of the underlying struct as a Future, which isn't Send.

I think I need to return Vec<Future<Output=Result<PutObjectOutput, SdkError<PutObjectError, HttpResponse>>>, but when I do that, I get this error:

the size for values of type dyn Future<Output = Result<PutObjectOutput, SdkError<PutObjectError, Response>>> cannot be known at compilation time
the full name for the type has been written to 'C:\git\sg-sqs-app\target\debug\deps\sg_sqs_app-5b2c2a9e10987857.long-type-1088543949375940910.txt'
consider using --verbose to print the full type name to the console
the trait Sized is not implemented for dyn Future<Output = Result<PutObjectOutput, SdkError<PutObjectError, Response>>>

So what's the right way to accomplish this?

I'm not going to be as much help as the other participants in this topic when it comes to tokio, however: returning impl Future leaks the auto-trait implementations of the future, like Send. So the problem is probably deeper.

I'm guessing you also got this somewhere deep down the avalanche of errors?

error[E0782]: trait objects must include the `dyn` keyword

Assuming you didn't mean to need to type erase the future,[1] that's the real error, and the pages of Sized related errors is just fallout from the compiler trying to soldier on with an incorrect assumption of what you meant.

In which case maybe you meant

//  vvvv
Vec<impl Future<Output = ....

If you did mean to type erase the future, you can try...

Vec<Pin<Box<
    dyn Future<Output=...>
    + Send // auto-traits will need to be specified
>>>

And you'll have to Box::pin(...) the futures.

(If the problem is indeed deeper, it will probably remain though; these suggestions are just about resolving the Sized errors.)


  1. e.g. into a Pin<Box<dyn Future ...>> ↩ī¸Ž

@quinedot I did some reading online, and figured out the Box & Pin thing.

I was able to get it working with the implementation below .... I think. It compiles and runs. I'm just not sure if I'm getting the desired multi-threading with tokio yet. I need to add some debugging code to check thread IDs.

:one: Here's the function that creates the Vec of Future.


pub async fn create_objects() -> Vec<Pin<Box<impl Future<Output=Result<PutObjectOutput, SdkError<PutObjectError, HttpResponse>>>>>>
{

  let mut future_list = vec![];

  let aws_cfg = get_aws_cfg().await;
  let s3_client = s3::Client::new(&aws_cfg);
  
  for _ in 1..=5 {

    let mut key = uuid::Uuid::new_v4().to_string();
    key.push_str(".txt");      
    let body = ByteStream::new(SdkBody::from(key.clone()));

    future_list.push(
      Box::pin(
        s3_client.put_object()
          .bucket("trevor-rust-usw1")
          .key(&key)
          .body(body)
          .send()
      )
    );
  }

  return future_list;
}

:two: Here's the code that calls the Future generator function, and spawns them with tokio.

    let mut futures = s3::create_objects().await;
    
    let mut join_handle_list = vec![];

    for future in futures {
        let join_handle_new = tokio::task::spawn(future);
        join_handle_list.push(join_handle_new);
    }
    let result = join_all(join_handle_list).await;

Does all this look reasonable?

Tokio has single-threaded and multi-threaded runtimes; which one you get is controlled by the presence of the rt-multi-thread feature flag.

Reasonable, yes; idiomatic, no. You are doing a lot of mutation and pushing and allocation and explicit returning that's not needed, and your code formatting is unconventional.

Here are some improvements.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.