Tokio: How to spawn two (or more) Interval streams into the same handle?

I'm trying to use the tokio event loop to spawn futures at different intervals. Consider the following function


pub fn sample_interval(dur: Duration,
                       handle: &Handle,
                       pool: &CpuPool)
                       -> Box<Future<Item = (), Error = io::Error>> {
    let interval = Interval::new(dur, handle).unwrap();
    let temp = TempSensor::new();
    let pool = pool.clone();
    let int_stream = interval.for_each(move |_| {
        let sample = temp.sample();
        Box::new(pool.spawn(myfut(sample)))
    });
    Box::new(int_stream)
}

Here, at every interval I want to run sample() to get some metrics and then send the result to a CpuPool to be processed. This function works and I'm using it in this way:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let pool = CpuPool::new_num_cpus();

    let temp_stream = temp_sensor::sample_interval(Duration::from_millis(500), &handle, &pool);
    let freq_stream = freq_sensor::sample_interval(Duration::from_millis(1000), &handle);
    handle.spawn(temp_stream.map_err(|_| ()));
    handle.spawn(freq_stream.map_err(|_| ()));

    core.run(futures::future::empty::<(), ()>()).unwrap();
}

I managed to get this code working, however I'm pretty sure that there's a better way. My main concerns are:

  • Should I need to Box the Stream before return the sample_interval function? If I remove that then it's a compiler nightmare that I don't know how to solve.
  • The map_err(|_| ()) was copied from some repo in github, but I don't know why I need that, I would love just to have something like handle.spawn(temp_stream)

Thank you very much! I'll appreciate any comments/advice

If you're using nightly, you might get away with returning impl Future instead of a boxed one. The price you pay for boxing is a level of indirection, which is not a bad tradeoff if the alternative is dealing with Lovecraftian types.

See the signature of Handle::spawn: it expects a Future with the Error type of (), so you have to transform other error types to that one.


One purely stylistic remark: for an infinitely polled event loop I prefer to use loop { core.turn(None) } because it's nicer to look at.

Thanks @inejge !

Yes, definitely the boxed return is better that the ominous alternative :smile:.
I changed the return to Box<Future<Item = (), Error = ()>> and now I can have the nicer handle.spawn(temp_stream) calls in the main function.

I also added the loop you suggested.

Thanks again!

I don’t think you need the inner Box to wrap the CpuPool future though. That seems like a needless allocation at a decent frequency.

I personally prefer the empty() future rather than a manual turn loop - why write imperative code there if the alternative is more to the point? :slight_smile: