Try_join_all across different future types

Hi all,

I am building a gRPC server and decided to take the opportunity to learn Rust. I have been building an ORM layer using SeaORM as it supports async/await. The database schema is such that there is a central table, which we can call state for the the sake of this question, and several related tables. Again, for the sake of this question, they can be referred to as state_details_one and state_details_two. Both of the latter tables have a primary key which is the auto-incremented primary key from the state table.

I am fetching a set of state::Model objects from the state table, and then querying the correct related table using the primary key ID. It looks something like this:

// Get all states which haven't been completed.
let open_states: Vec<state::Model> = state::Entity::find()
  .filter(state::Column::State.ne(state::States::Complete))
  .all(connection)
  .await?;

// For all open states, fetch the data from the corresponding tables.
let future_states = open_states
  .iter()
  .filter_map(|model| match model.state {
      state::StateDetails::StateDetailsOne => {
         state_details_one::Entity::find_by_id(model.id).one(connection)
      }
      state::StateDetails::StateDetailsTwo => {
         state_details_two::Entity::find_by_id(model.id).one(connection)
      }
      _ => None,
  })
  .collect();

My goal is to end up with future_states being a collection which I can use with futures::future::try_join_all, allowing all of the detail states to be fetched in parallel.

The above code does not compile because the state::StateDetails::StateDetailsOne branch returns, roughly, an impl Future<Output = Result<Option<state_details_one::Model>, DbErr>> while the state::StateDetails::StateDetailsTwo branch returns an impl Future<Output = Result<Option<state_details_two::Model>, DbErr>>.

I created an enum to capture the different models and used Future's map method to try and convert the output types to be the same using a method like so:

enum StateDetails {
  StateOne(state_details_one::Model),
  StateTwo(state_details_two::Model),
}

impl From<state_details_one::Model> for StateDetails {
  ... 
}

impl From<state_details_two::Model> for StateDetails {
  ... 
}

fn convert_results<M: Into<StateDetails>>(
    query_result: std::result::Result<Option<M>, DbErr>,
) -> Result<BuyFlowModels> {
    match query_result {
        Ok(option_model) => match option_model {
            Some(model) => Ok(model.into()),
            None => bail!(ErrorKind::StateError())),
        },
        Err(db_err) => bail!(db_err),
    }
}

However, it turns out that calling map directly on the future result returns a futures::future::Map which has the original Future's Output type (Self) as part of its parameterization, meaning they are still different Future types and can't be collected together.

It looks like I can build functions which return impl Future<Result<StateDetails>> and that futures::future::Map instances will be converted into this future, but then I started running into issues with Error: distinct uses of impl Trait result in different opaque types because I was trying to use a single, templated function to do the conversions for the fetches from the various state details tables.

At any rate, I've done a lot of experimenting and poking around, and I figured it's time to ask for help. Assuming I OK with having an enum that encapsulates the different SeaORM model types, what is the right way to convert various Futures to this common type without having to await them, so that I can then use a single try_join_all? Or is there a better way of accomplishing what I am trying to do?

Thank you all for your time and help.

In general, you can use the FutureExt::boxed function to convert the different future types into a single common type using trait objects. Beyond that, I would recommend using FuturesUnordered rather than the join_all family of types.

Another possibility is to go for the unstable tokio::task::JoinSet type, which does not require boxing them.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.