Stream/iterator combinators for sequential transforms?

I've got a situation where I have, from user input, a list of identifiers. These identifiers need to be resolved to records from a database, within a single transaction (i.e., they cannot be resolved in parallel; the underlying DBMS doesn't support that within a single tx). I've written this out longhand:

        let mut launchers = vec![];
        for id in self.launcher {
            let launcher = tx.get_launcher(&project, id).await?;

I would love to understand how to write this as a transform on self.launcher.into_iter(), instead. If the lookup operation were synchronous, this would be

let launchers: Vec<_> = self.launcher
  .map(|id| tx.get_launcher(&project, id))

However, I haven't been able to ferret out the right combination of tools to do this when the transform yields a future, instead of an immediately-available result. Is this possible?

I have tried a few variations on

use futures::stream::{self, *};

let launchers = stream::iter(self.launcher)
  .map(|id| tx.get_launcher(&project, id))

but I'm not competent to understand the resulting error messages.

error[E0599]: the method `try_collect` exists for struct `futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/ 34:26]>>`, but its trait bounds were not satisfied
  --> src/release/
36 |         .try_collect()
   |          ^^^^^^^^^^^ method cannot be called on `futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/ 34:26]>>` due to unsatisfied trait bounds
  ::: /Users/owen/.cargo/registry/src/
9  | pub struct Iter<I> {
   | ------------------
   | |
   | doesn't satisfy `_: TryStreamExt`
   | doesn't satisfy `_: TryStream`
   = note: the following trait bounds were not satisfied:
           `futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/ 34:26]>>: TryStream`
           which is required by `futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/ 34:26]>>: TryStreamExt`
           `&futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/ 34:26]>>: TryStream`
           which is required by `&futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/ 34:26]>>: TryStreamExt`
           `&mut futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/ 34:26]>>: TryStream`
           which is required by `&mut futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/ 34:26]>>: TryStreamExt`

I believe the problem is that Map: TryStreamExt if and only if Item = Result<_, _>. In contrast, your closure doesn't return Result<_, _> – it is async, so it returns Futures.

You probably want then instead of map: Playground.


Ah, I overlooked that, thank you.

Unfortunately, I think lifetime problems probably scuttle this: those methods on tx receive &mut self, and passing it into a closure while also keeping it alive across await points means that both the closure and the async function's underlying state machine need to have a reference, at the same time. That's not going to work - which is frustrating, because the resulting program is (probably) sound in this regard even if the general case isn't.

I could probably work around that with some Arc<Mutex<…>> glue to hold the transaction, but at that point, what am I gaining over a for loop?

Going from &T to &mut T is easy enough – just use a mutex. (The transaction could and probably should do that internally anyway.) One technicality is to make sure id is moved, not borrowed, hence my Id newtype.

