Tokio: How to spawn two (or more) Interval streams into the same handle?


#1

I’m trying to use the tokio event loop to spawn futures at different intervals. Consider the following function


pub fn sample_interval(dur: Duration,
                       handle: &Handle,
                       pool: &CpuPool)
                       -> Box<Future<Item = (), Error = io::Error>> {
    let interval = Interval::new(dur, handle).unwrap();
    let temp = TempSensor::new();
    let pool = pool.clone();
    let int_stream = interval.for_each(move |_| {
        let sample = temp.sample();
        Box::new(pool.spawn(myfut(sample)))
    });
    Box::new(int_stream)
}

Here, at every interval I want to run sample() to get some metrics and then send the result to a CpuPool to be processed. This function works and I’m using it in this way:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let pool = CpuPool::new_num_cpus();

    let temp_stream = temp_sensor::sample_interval(Duration::from_millis(500), &handle, &pool);
    let freq_stream = freq_sensor::sample_interval(Duration::from_millis(1000), &handle);
    handle.spawn(temp_stream.map_err(|_| ()));
    handle.spawn(freq_stream.map_err(|_| ()));

    core.run(futures::future::empty::<(), ()>()).unwrap();
}

I managed to get this code working, however I’m pretty sure that there’s a better way. My main concerns are:

  • Should I need to Box the Stream before return the sample_interval function? If I remove that then it’s a compiler nightmare that I don’t know how to solve.
  • The map_err(|_| ()) was copied from some repo in github, but I don’t know why I need that, I would love just to have something like handle.spawn(temp_stream)

Thank you very much! I’ll appreciate any comments/advice


#2

If you’re using nightly, you might get away with returning impl Future instead of a boxed one. The price you pay for boxing is a level of indirection, which is not a bad tradeoff if the alternative is dealing with Lovecraftian types.

See the signature of Handle::spawn: it expects a Future with the Error type of (), so you have to transform other error types to that one.


One purely stylistic remark: for an infinitely polled event loop I prefer to use loop { core.turn(None) } because it’s nicer to look at.


#3

Thanks @inejge !

Yes, definitely the boxed return is better that the ominous alternative :smile:.
I changed the return to Box<Future<Item = (), Error = ()>> and now I can have the nicer handle.spawn(temp_stream) calls in the main function.

I also added the loop you suggested.

Thanks again!


#4

I don’t think you need the inner Box to wrap the CpuPool future though. That seems like a needless allocation at a decent frequency.

I personally prefer the empty() future rather than a manual turn loop - why write imperative code there if the alternative is more to the point? :slight_smile: