Sequencing future/streams

Hi there,

I am playing around with Rust’s futures library, and I am trying to find a convenient way to lift a Future<Item=Vec<Record>, Error=()> into a Stream<Item=Vec<Record>, Error=()>>

    use futures::{Future, Stream};
    use futures::{future, stream};
   
    pub struct Record {
        name: String,
    }

    pub fn get_items() -> Box<Future<Item=Vec<Record>, Error=()>> {
        Box::new(future::lazy(|| {
           Ok(vec![Record {name: "a".to_string()}, Record {name: "b".to_string()}])
        }))
    }

    pub fn item_stream() -> Box<Stream<Item=Record, Error=()>> {
        Box::new(get_items().into_stream().and_then(|items| {
            stream::iter_ok(items).into_future()
        }))
    }

However, the second function doesn’t compile:

error[E0271]: type mismatch resolving `<futures::stream::StreamFuture<futures::stream::IterOk<std::vec::IntoIter<example::Record>, _>> as futures::IntoFuture>::Error == ()`
  --> src/example.rs:15:40
   |
15 |     Box::new(get_items().into_stream().and_then(|items| {
   |                                        ^^^^^^^^ expected tuple, found ()
   |
   = note: expected type `(_, futures::stream::IterOk<std::vec::IntoIter<example::Record>, _>)`
              found type `()`

error[E0271]: type mismatch resolving `<futures::stream::AndThen<futures::IntoStream<std::boxed::Box<dyn futures::Future<Item=std::vec::Vec<example::Record>, Error=()>>>, [closure@src/example.rs:15:49: 17:6], futures::stream::StreamFuture<futures::stream::IterOk<std::vec::IntoIter<example::Record>, _>>> as futures::Stream>::Item == example::Record`
  --> src/example.rs:15:5
   |
15 | /     Box::new(get_items().into_stream().and_then(|items| {
16 | |         stream::iter_ok(items).into_future()
17 | |     }))
   | |_______^ expected tuple, found struct `example::Record`
   |

What I am after, is a method to sequence two functions returning streams, but I am only aware of and_then which seems to expect a type that can be transformed into a future. In Scala, which I am more familiar with, such method would be flatMap.

Can someone help me out understand the compiler error? and perhaps point me to an idiomatic way to achieve what I am trying to do here?

I’m not entirely sure whether that works, what you are wanting.

Let me try to rephrase it.
You have a function called get_items which creates a Vec<Record> which means the future will be completed as soon as all records has been created and collected. Next you want to create a Stream, which streams a single Record at a time. (Corret me if I’m wrong).

To my understanding this is the wrong approach. A stream should give you one Record at a time and the Future will wait until all elements in the stream are exhausted.

I know, that this won’t help you with the solution right now, but I want to clarify that your approach is (possibly) the wrong way around.

Hi @hellow

I feel like what I am doing is fairly common and legit, as I have seen it done extensively in other streaming libraries (e.g. fs2 in scala, or Conduit in Haskell) and in production code.

Specifically, Imagine that get_items() will asynchronously fetch a small set of documents from a datastore. Once obtained these, I want to work with a stream of items, so that I can sequence other async/effectfull functions on each single record (e.g. enhancing the record with additional data from another service, or serialising it and writing it to disk ). Generally, lifting the vector into a stream allows to make use of the other useful combinators provided by Stream, which is what I am trying do!

Are you after something like this outline?

enum Inner {
    Future(...),
    OkStream(..),
}
struct StreamFromFutureVec(Inner);
impl StreamFromFutureVec {
  pub fn new(Future) -> Self {..}
}
impl Stream for StreamFromFutureVec {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        if self.0 is Future {
            poll it
            if not ready {
                return;
            }
            // something with error
            stream::iter_ok(items) set to self.0
        }
        
        assert self.0 is OkStream
        return result of poll
    }
}

@jonh perhaps the only way to do this is indeed to define a wrapping type that implements Stream. However, I was hoping for a more light-weight solution, using existing combinators (like stream::iter_ok, etc.).

Why not just doing

get_items()
    .and_then(|records| {
        records.into_iter().for_each(|record| use_record(record));
        Ok(())
    });

I am with @hellow, it sounds incorrect (or at least very suboptimal) to transform a set of data already available into a Stream.

If you need to use every element to perform a set of asynchronous operations, you can try to simply chain the steps you want to do. Look at the problem from another perspective: you use a Stream in order to asynchronously wait for an operation, get the result and let the operation asynchronously evaluate the following element (if any). In your case it is like you wait for 100 elements, then you create 100 asynchronous operations to be performed, because you already got the 100 elements and you are not waiting for a stream of data.
It looks like you just need an iterator, not a stream.

The two combinators you need are stream::iter_ok to transform the vector into a stream + Future::flatten_stream to transform the Future<Stream<Item>> into a Stream<Item> (playground)

pub fn item_stream() -> Box<Stream<Item = Record, Error = ()>> {
    Box::new(get_items().map(stream::iter_ok).flatten_stream())
}

But, as mentioned by others, depending on how you are then processing the items it may be that you would want to instead map the vector into a vector of futures then transform this back into a future/stream.

1 Like

@Nemo157 thanks! Future::flatten_stream was indeed the combinator I was after! @dodomorandi, @hellow, @Nemo157 point taken on using iterator instead of Stream in this case. Yet my question was really about how to express this specific transformation using the future library; I am glad to see that turns out to be fairly easy and coincise :slight_smile: