let proces = reader
.for_each(|frame| {
println!("{}, {}", frame.0, frame.1);
Ok(())
})
.and_then(|_| {
println!("Socket received FIN packet and closed connection");
Ok(())
})
.or_else(|err| {
println!("Socket closed with error: {:?}", err);
Err(err)
})
.then(|result| {
println!("Socket closed with result: {:?}", result);
Ok(())
});
After I get the frame (inside for_each()) I need to use it to create some another MyFuture and continue the chain of Future like this:
{Frame} -> {some data from previous Frame is used to create MyFuture1} -> {some data from previous MyFuture1 is used to create MyFuture2} -> {...} -> then(|res| ...)
You’ll need to make the future chain inside for_each() returns () (ie all produced values in the chain are consumed internally). Typically you just return Ok(()) (as in your snippet) to let for_each continue with the next item in the stream. If you return an Err(..), then for_each terminates with that error.
@vitalyd, I've got your idea, so I implemented Future for my frame of type MyData:
pub struct MyData(pub u64, pub String);
impl Future for MyData {
type Item = u64;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
Ok(Async::Ready(self.0))
}
}
My second Future will be:
pub struct Worker(pub u64, pub u64);
impl Future for Worker {
type Item = (u64, u64);
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
self.1 -= 1;
if self.1 == 0 {
Ok(Async::Ready((self.0, self.1)))
} else {
println!("Worker {} is not ready", self.0);
futures::task::current().notify();
Ok(Async::NotReady)
}
}
}
But when I tryed to test the chain of Futures:
fn main() {
let data = codec::MyData(0, String::from("qwerty"));
let res = data.and_then(|id| worker::Worker(id, 4)).then(|_| Ok(()));
tokio::run(res);
}
I met the error:
error[E0271]: type mismatch resolving `<mylib::worker::Worker as mylib::<unnamed>::IntoFuture>::Error == std::io::Error`
--> src/main.rs:15:20
|
15 | let res = data.and_then(|id| worker::Worker(id, 4)).then(|_| Ok(()));
| ^^^^^^^^ expected (), found struct `std::io::Error`
|
= note: expected type `()`
found type `std::io::Error`
and_then requires the two futures to have same Error type, which yours don’t. You can do data.map_err(|_| ()).and_then(...) to make the Error () for both. Alternatively, use data.then(...) and decide how to handle the std::io::Error there.