I want to tick 10 times at an interval of 1s before exiting the program.
Problem is, i can't figure out how to exit the reactor?
let mut x = 0;
let ticks = Interval::new(Duration::new(1, 0), &core_handle).unwrap();
let ticks_future = ticks.for_each( move || {
if x == 10 {
// stop reactor.
}
x += 1;
Ok(())
});
core.run(ticks_future).unwrap();
From what i understand, reactor will stop when "one off" future ticks_future is completed.
One solution i have in mind is to wrap ticks_future in another future parent_future, pass it (parent_future) to reactor, and then manually complete the parent_future when x == 10 is true. I don't know how (if ever) this will work.
Other solution I thought of was to use core_handle to signal the reactor to stop. Though, there is no such API.
This is generally a good way of solving Tokio problems: if a future doesn't do what's needed, wrap it another one which does. In this case, though, one should wrap the stream directly, because a future derived from an Interval with for_each() will never resolve.
A complete program might look like this:
#[macro_use]
extern crate futures;
extern crate tokio_core;
use std::error::Error;
use std::time::Duration;
use futures::{Async, Poll, Stream};
use tokio_core::reactor::{Core, Interval};
struct TenTicks {
interval: Box<Stream<Item=(), Error=<Interval as Stream>::Error>>,
x: i32,
}
impl Stream for TenTicks {
type Item = ();
type Error = <Interval as Stream>::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.interval.poll()) {
Some(_) => {
if self.x == 10 {
Ok(Async::Ready(None))
} else {
self.x += 1;
Ok(Async::Ready(Some(())))
}
},
_ => unimplemented!(),
}
}
}
fn do_ticks() -> Result<(), Box<Error>> {
let mut core = Core::new()?;
let ticks = Interval::new(Duration::new(1, 0), &core.handle())?;
let ten_ticks_future = TenTicks {
interval: Box::new(ticks),
x: 0
}.for_each(|_| Ok(()));
Ok(core.run(ten_ticks_future)?)
}
fn main() {
match do_ticks() {
Ok(_) => println!("done"),
Err(e) => println!("{}", e),
}
}
I went ahead and tried to include oneshot in my code, and this is how it looks:
extern crate tokio_core;
extern crate futures;
use futures::future::*;
use futures::{Future, Stream};
use tokio_core::reactor::{Core, Interval};
use std::time::Duration;
use futures::sync::oneshot;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let (tx, rx) = oneshot::channel::<u8>();
let mut c = 0;
let interval_stream = Interval::new(Duration::new(1, 0), &handle)
.unwrap();
let fut = interval_stream.for_each(move |_| {
println!("fired !!! {}", c);
if c == 10 {
println!("done: {}", c);
tx.send(1);
}
c += 1;
ok(())
});
let oneoff_fut = fut.select2(rx);
core.run(oneoff_fut);
}
But, then compiler complains:
error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/main.rs:28:13
|
16 | let (tx, rx) = oneshot::channel::<u8>();
| -- captured outer variable
...
28 | tx.send(1);
| ^^ cannot move out of captured outer variable in an `FnMut` closure
error: aborting due to previous error
CMIIW: move moves the ownership of a variable into the closure. For first call of closure, tx can be used, but for further calls, tx would not be available as the call to closure in first iteration has already consumed tx. Hence the compiler complains.
Is this what the error means?
If yes, then how can I access txrepeatedly inside closure? Should I take a reference?
PS: Thanks all for the help! Each solution gives a lot of stuff to learn.
Returning Err to signal completion of the stream seems kind of dirty. It would’ve been nicer if the Stream::for_each returned IntoFuture<Item=Continue, ...> where Continue is an enum indicating Yes or No (or something like that). Maybe that’s too niche though.
In this case take_while is probably ok. The more general issue with it though is timeliness - the stream won’t actually terminate until take_while yields false, which is the iteration after when you want the stream to end. And that last iteration may not come for a while (if at all). Tokio - stop listening for new tcp connections conditionally is another case of wanting to terminate the stream ASAP.
Sorry for the late reply. I just stumbled over this thread and a possible solution came to my mind.
The easiest way to quit after 10 ticks in this case would be to limit the interval stream. ticks is an interval stream that produces () every tick. If you want to quit after 10 ticks, simply limit the stream to 10 items by using the take() combinator. When the stream ends, the loop future completes and the reactor stops.
let ticks_future = ticks.take(10).for_each( move || { ... });