Create chain of serial Futures

I have the framed reader from socket like this:

let proces = reader
        .for_each(|frame| {
            println!("{}, {}", frame.0, frame.1);
            Ok(())
        })
        .and_then(|_| {
            println!("Socket received FIN packet and closed connection");
            Ok(())
        })
        .or_else(|err| {
            println!("Socket closed with error: {:?}", err);
            Err(err)
        })
        .then(|result| {
            println!("Socket closed with result: {:?}", result);
            Ok(())
        });

After I get the frame (inside for_each()) I need to use it to create some another MyFuture and continue the chain of Future like this:

{Frame} -> {some data from previous Frame is used to create MyFuture1} -> {some data from previous MyFuture1 is used to create MyFuture2} -> {...} -> then(|res| ...)

How can I do it?

Something like:

reader
        .for_each(|frame| {
               get_first_future(frame)
                  .and_then(|data| get_second_future(data)
                  .then(|res| ...)
        })

You’ll need to make the future chain inside for_each() returns () (ie all produced values in the chain are consumed internally). Typically you just return Ok(()) (as in your snippet) to let for_each continue with the next item in the stream. If you return an Err(..), then for_each terminates with that error.

1 Like

@vitalyd, I've got your idea, so I implemented Future for my frame of type MyData:

pub struct MyData(pub u64, pub String);

impl Future for MyData {
    type Item = u64;
    type Error = io::Error;

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        Ok(Async::Ready(self.0))
    }
}

My second Future will be:

pub struct Worker(pub u64, pub u64);

impl Future for Worker {
    type Item = (u64, u64);
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {

        self.1 -= 1;

        if self.1 == 0 {
            Ok(Async::Ready((self.0, self.1)))
        } else {
            println!("Worker {} is not ready", self.0);
            futures::task::current().notify();
            Ok(Async::NotReady)
        }
    }
}

But when I tryed to test the chain of Futures:

fn main() {

    let data = codec::MyData(0, String::from("qwerty"));

    let res = data.and_then(|id| worker::Worker(id, 4)).then(|_| Ok(()));

    tokio::run(res);
}

I met the error:

error[E0271]: type mismatch resolving `<mylib::worker::Worker as mylib::<unnamed>::IntoFuture>::Error == std::io::Error`
  --> src/main.rs:15:20
   |
15 |     let res = data.and_then(|id| worker::Worker(id, 4)).then(|_| Ok(()));
   |                    ^^^^^^^^ expected (), found struct `std::io::Error`
   |
   = note: expected type `()`
              found type `std::io::Error`

What's the reson of my failure?

1 Like

and_then requires the two futures to have same Error type, which yours don’t. You can do data.map_err(|_| ()).and_then(...) to make the Error () for both. Alternatively, use data.then(...) and decide how to handle the std::io::Error there.

2 Likes

@vitalyd, thank you!

2 Likes