Tokio Polling behaviour and application structuring


#1

Currently working on test spec impl, I figured it would be smart to use tokio to model many parallel test sequences.

Right now I am still not sure how tokio exactly works despite having read tokio.rs all of the toturial.

The code below does only execute the first action but not all of them, Actions in sequence.

I am not sure how to model this properly in rust. Help would be much appreciated, I am still learning a lot on how to model things with tokio/futures in rust.

#[macro_use]
extern crate error_chain;

extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_periodic;
extern crate tokio_timer;

use futures::stream::Stream;
use tokio_core::reactor::Core;

use futures::future::Future;
use futures::{Async,Poll};

use std::vec::Vec;

mod error {
error_chain! {
    foreign_links {
        Fmt(::std::fmt::Error);
        Io(::std::io::Error) #[cfg(unix)];
        Timer(::tokio_timer::TimerError);
    }

    errors {
        Stuff(t: String) {
            description("invalid stuff")
            display("invalid stuff: '{}'", t)
        }

        Things(v: String) {
            description("unknown thing"), // note the ,
            display("unknown thing: '{}'", v), // trailing comma is allowed
        }
    }
}
}




#[derive(Clone, Debug, Eq, PartialEq)]
enum Action {
    WorkWork,
    DillyDally,
    YodaYada,
}

//impl Action {
//    pub execute() -> Box<Future<Item=(),Error=error::Error>>
//}


use std::collections::VecDeque;


struct Squeeze {
    actions : Vec<Action>,
    current : Option<Action>,
}

impl Squeeze {
    pub fn new() -> Self {
        let mut x = Self {
            actions : vec![],
            current : None,
        };
        x.actions.push(Action::WorkWork);
        x.actions.push(Action::DillyDally);
        x.actions.push(Action::YodaYada);
        x
    }

    pub fn next(&mut self) -> Option<Action> {
        self.current = self.actions.pop();
        self.current.clone()
    }
}

impl Future for Squeeze {
    type Item = Option<Action>;
    type Error = error::Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        // lazy_static!{
        //     let TIMER = Timer::default();
        // }
        if let Some(action) = self.next() {
            println!("Action {:?}", &action);
            Ok(Async::Ready(Some(action)))
        } else {
            println!("No Actions left");
            Ok(Async::NotReady) // still some actions to run
        }
    }
}



fn main() {
    let mut core = tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    let work = Squeeze::new();

    core.run(work).unwrap();

    println!("All work is done!");

}

which gives me the following output:

Action YodaYada
All work is done!

Any tips? Input? Suggestions?


#2

Once you return Async::Ready the future is done and will not be polled again. Squeeze is really a Stream, not a Future per se.


#3

How did I miss that…

Thank you so much!

Solution to execute each of the Actions in Sequence :slight_smile:

#[macro_use]
extern crate error_chain;

extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_periodic;
extern crate tokio_timer;

use futures::stream::Stream;
use tokio_core::reactor::Core;
use tokio_core::net::TcpListener;

use futures::future::Future;
use futures::{Async,Poll};

use std::vec::Vec;

mod error {
error_chain! {
    foreign_links {
        Fmt(::std::fmt::Error);
        Io(::std::io::Error) #[cfg(unix)];
        Timer(::tokio_timer::TimerError);
    }

    errors {
        Stuff(t: String) {
            description("invalid stuff")
            display("invalid stuff: '{}'", t)
        }

        Things(v: String) {
            description("unknown thing"), // note the ,
            display("unknown thing: '{}'", v), // trailing comma is allowed
        }
    }
}
}




#[derive(Clone, Debug, Eq, PartialEq)]
enum Action {
    WorkWork,
    DillyDally,
    YodaYada,
}


use std::collections::VecDeque;


struct Squeeze {
    actions : Vec<Action>,
    current : Option<Action>,
}

impl Squeeze {
    pub fn new() -> Self {
        let mut x = Self {
            actions : vec![],
            current : None,
        };
        x.actions.push(Action::WorkWork);
        x.actions.push(Action::DillyDally);
        x.actions.push(Action::YodaYada);
        x
    }

    pub fn next(&mut self) -> Option<Action> {
        self.current = self.actions.pop();
        self.current.clone()
    }
}

impl Stream for Squeeze {
    type Item = Action;
    type Error = error::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        // lazy_static!{
        //     let TIMER = Timer::default();
        // }
        if let Some(action) = self.next() {
            println!("Action {:?}", &action);
            Ok(Async::Ready(Some(action)))
        } else {
            println!("No Actions left");
            Ok(Async::NotReady) // still some actions to run
        }
    }
}



fn main() {
    let mut core = tokio_core::reactor::Core::new().unwrap();
    let handle = core.handle();

    let work = Squeeze::new();

    let work = work.for_each(|action| { println!("Action by Future {:?}", action); futures::future::result::<(),error::Error>(Ok(())) } );
    core.run(work).unwrap();

    println!("All work is done!");

}