Futures-rs: selecting over timeout and channel

        // Mqtt Connection created
        let (stream, _) = connection_lp.run(mqtt_connect).unwrap();

        // Send Ping Requests periodically over existing stream
        let mut timeout_lp = Loop::new().unwrap();
        let timeout = timeout_lp.handle().timeout(Duration::new(4, 0)).map(|t| {
            let packet = _generate_pingreq_packet().unwrap();
            futures_io::write_all(stream, packet)
        });

After creating a connection, I want to send pings to the server periodically over existing stream. How do I do that with futures. Finally I would like to be able to select over timeout and channel (to receive data from other thread and send).

Would love some help on this.

1 Like

If you're ok with a lot of imprecision in precisely when the pings are sending, you can currently jerry-right a periodic stream like so:

fn periodic(handle: LoopHandle, dur: Duration) -> IoStream<()> {
    let stream = stream::iter(iter::repeat(()).map(Ok));
    stream.and_then(move |()| handle.clone().timeout(dur))
          .and_then(|timeout| timeout)
          .boxed()
}

This is not very precise, however, and you'd have to keep pulling from the stream to activate the next timeout. You can likely make it more precise, though, through the usage of timeout_at. In any case, though, once you've got a stream of timeouts, you can then add the write_all in an and_then combination from that stream.

To select over arbitrary events, you'd want to use the select combinator and you can also use the timeout function to create a timeout and oneshot to receive an event from another thread.

Does that make sense? Those are at least the pieces, although it may not be immediately clear how to plug them all together.