How to stop tokio-reactor


#1

Hi,

New rustacean here.

I want to tick 10 times at an interval of 1s before exiting the program.
Problem is, i can’t figure out how to exit the reactor?

    let mut x = 0;
    let ticks = Interval::new(Duration::new(1, 0), &core_handle).unwrap();
    let ticks_future = ticks.for_each( move || {

        if x == 10 {
            // stop reactor.
        }
        x += 1;

        Ok(())

    });

    core.run(ticks_future).unwrap();

From what i understand, reactor will stop when “one off” future ticks_future is completed.

One solution i have in mind is to wrap ticks_future in another future parent_future, pass it (parent_future) to reactor, and then manually complete the parent_future when x == 10 is true. I don’t know how (if ever) this will work.

Other solution I thought of was to use core_handle to signal the reactor to stop. Though, there is no such API.

Thanks


#2
extern crate futures;
extern crate tokio_core;

use std::thread;

use futures::future::{self, Future};
use futures::sync::oneshot;
use tokio_core::reactor::Core;

fn main() {
    let (tx, rx) = oneshot::channel();
    let thread_handle = thread::spawn(move||run(rx));
    tx.send(()).unwrap();
    thread_handle.join().unwrap();
}

fn run(rx: oneshot::Receiver<()>) {
    let mut core = Core::new().unwrap();
    let f = future::empty::<(), ()>().select2(rx);
    core.run(f).unwrap();
}

Future has a combinator select2 ,

Waits for either one of two differently-typed futures to complete.


#3

This is generally a good way of solving Tokio problems: if a future doesn’t do what’s needed, wrap it another one which does. In this case, though, one should wrap the stream directly, because a future derived from an Interval with for_each() will never resolve.

A complete program might look like this:

#[macro_use]
extern crate futures;
extern crate tokio_core;

use std::error::Error;
use std::time::Duration;

use futures::{Async, Poll, Stream};
use tokio_core::reactor::{Core, Interval};

struct TenTicks {
    interval: Box<Stream<Item=(), Error=<Interval as Stream>::Error>>,
    x: i32,
}

impl Stream for TenTicks {
    type Item = ();
    type Error = <Interval as Stream>::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match try_ready!(self.interval.poll()) {
            Some(_) => {
                if self.x == 10 {
                    Ok(Async::Ready(None))
                } else {
                    self.x += 1;
                    Ok(Async::Ready(Some(())))
                }
            },
            _ => unimplemented!(),
        }
    }
}

fn do_ticks() -> Result<(), Box<Error>> {
    let mut core = Core::new()?;
    let ticks = Interval::new(Duration::new(1, 0), &core.handle())?;
    let ten_ticks_future = TenTicks {
        interval: Box::new(ticks),
        x: 0
    }.for_each(|_| Ok(()));
    Ok(core.run(ten_ticks_future)?)
}

fn main() {
    match do_ticks() {   
        Ok(_) => println!("done"),
        Err(e) => println!("{}", e),
    }
}

#4

Another way to do this is with futures::future::loop_fn:

let mut core = Core::new().unwrap();
let ticks = Interval::new(Duration::new(1, 0), &core.handle()).unwrap();
let fut = loop_fn((ticks, 0), |(ticks, x)| {
     ticks.into_future().and_then(move |(_, t)| {
            if x == 10 {
                return Ok(Loop::Break(()));
            }
            Ok(Loop::Continue((t, x + 1)))
        })
    });

let _ = core.run(fut);

#5

Thanks @nooberfsh for the idea.

I went ahead and tried to include oneshot in my code, and this is how it looks:

extern crate tokio_core;
extern crate futures;

use futures::future::*;
use futures::{Future, Stream};
use tokio_core::reactor::{Core, Interval};
use std::time::Duration;
use futures::sync::oneshot;

fn main() {

    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let (tx, rx) = oneshot::channel::<u8>();


    let mut c = 0;
    let interval_stream = Interval::new(Duration::new(1, 0), &handle)
                            .unwrap();

    let fut = interval_stream.for_each(move |_| {

        println!("fired !!! {}", c);
        if c == 10 {
            println!("done: {}", c);
            tx.send(1);
        }
        c += 1;

        ok(())

    });

    let oneoff_fut = fut.select2(rx);

    core.run(oneoff_fut);

}

But, then compiler complains:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
  --> src/main.rs:28:13
   |
16 |     let (tx, rx) = oneshot::channel::<u8>();
   |          -- captured outer variable
...
28 |             tx.send(1);
   |             ^^ cannot move out of captured outer variable in an `FnMut` closure

error: aborting due to previous error

CMIIW: move moves the ownership of a variable into the closure. For first call of closure, tx can be used, but for further calls, tx would not be available as the call to closure in first iteration has already consumed tx. Hence the compiler complains.

Is this what the error means?

If yes, then how can I access tx repeatedly inside closure? Should I take a reference?

PS: Thanks all for the help! Each solution gives a lot of stuff to learn. :smiley:


#6

You can put the sender into a RefCell<Option<...>> like this: https://play.rust-lang.org/?gist=6b80e6112bb30a4549e60b9ed7b76fe2&version=stable

Not elegant but works.


#7

You need not use oneshot in your example, just return an Err in for_each.

    let fut = interval_stream.map_err(|_|()).for_each(move |_| {

        println!("fired !!! {}", c);
        if c == 5 {
            println!("done: {}", c);
            //tx.send(1);
            return Err(());
        }
        c += 1;

        return Ok(());

    });


#8

Returning Err to signal completion of the stream seems kind of dirty. It would’ve been nicer if the Stream::for_each returned IntoFuture<Item=Continue, ...> where Continue is an enum indicating Yes or No (or something like that). Maybe that’s too niche though.


#9

use Stream::take_while maybe a better way in this case.


#10

In this case take_while is probably ok. The more general issue with it though is timeliness - the stream won’t actually terminate until take_while yields false, which is the iteration after when you want the stream to end. And that last iteration may not come for a while (if at all). Tokio - stop listening for new tcp connections conditionally is another case of wanting to terminate the stream ASAP.


#11

Sorry for the late reply. I just stumbled over this thread and a possible solution came to my mind.

The easiest way to quit after 10 ticks in this case would be to limit the interval stream. ticks is an interval stream that produces () every tick. If you want to quit after 10 ticks, simply limit the stream to 10 items by using the take() combinator. When the stream ends, the loop future completes and the reactor stops.

let ticks_future = ticks.take(10).for_each( move || { ... });