Stream/iterator combinators for sequential transforms?

Hi there!

I've got a situation where I have, from user input, a list of identifiers. These identifiers need to be resolved to records from a database, within a single transaction (i.e., they cannot be resolved in parallel; the underlying DBMS doesn't support that within a single tx). I've written this out longhand:

        let mut launchers = vec![];
        for id in self.launcher {
            let launcher = tx.get_launcher(&project, id).await?;
            launchers.push(launcher);
        }

I would love to understand how to write this as a transform on self.launcher.into_iter(), instead. If the lookup operation were synchronous, this would be

let launchers: Vec<_> = self.launcher
  .map(|id| tx.get_launcher(&project, id))
  .collect()?;

However, I haven't been able to ferret out the right combination of tools to do this when the transform yields a future, instead of an immediately-available result. Is this possible?

I have tried a few variations on

use futures::stream::{self, *};

let launchers = stream::iter(self.launcher)
  .map(|id| tx.get_launcher(&project, id))
  .try_collect();

but I'm not competent to understand the resulting error messages.

error[E0599]: the method `try_collect` exists for struct `futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/create.rs:34:22: 34:26]>>`, but its trait bounds were not satisfied
  --> src/release/create.rs:36:10
   |
36 |         .try_collect()
   |          ^^^^^^^^^^^ method cannot be called on `futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/create.rs:34:22: 34:26]>>` due to unsatisfied trait bounds
   |
  ::: /Users/owen/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.27/src/stream/iter.rs:9:1
   |
9  | pub struct Iter<I> {
   | ------------------
   | |
   | doesn't satisfy `_: TryStreamExt`
   | doesn't satisfy `_: TryStream`
   |
   = note: the following trait bounds were not satisfied:
           `futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/create.rs:34:22: 34:26]>>: TryStream`
           which is required by `futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/create.rs:34:22: 34:26]>>: TryStreamExt`
           `&futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/create.rs:34:22: 34:26]>>: TryStream`
           which is required by `&futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/create.rs:34:22: 34:26]>>: TryStreamExt`
           `&mut futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/create.rs:34:22: 34:26]>>: TryStream`
           which is required by `&mut futures::stream::Iter<std::iter::Map<std::vec::IntoIter<id::Id>, [closure@src/release/create.rs:34:22: 34:26]>>: TryStreamExt`

I believe the problem is that Map: TryStreamExt if and only if Item = Result<_, _>. In contrast, your closure doesn't return Result<_, _> – it is async, so it returns Futures.

You probably want then instead of map: Playground.

2 Likes

Ah, I overlooked that, thank you.

Unfortunately, I think lifetime problems probably scuttle this: those methods on tx receive &mut self, and passing it into a closure while also keeping it alive across await points means that both the closure and the async function's underlying state machine need to have a reference, at the same time. That's not going to work - which is frustrating, because the resulting program is (probably) sound in this regard even if the general case isn't.

I could probably work around that with some Arc<Mutex<…>> glue to hold the transaction, but at that point, what am I gaining over a for loop?

Going from &T to &mut T is easy enough – just use a mutex. (The transaction could and probably should do that internally anyway.) One technicality is to make sure id is moved, not borrowed, hence my Id newtype.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.