Context in futures 0.2

I used to be able to write something like this:

pub fn interval() -> UnboundedReceiver<()> {

    let (mut tx, rx) = unbounded();

    spawn(move || {
        loop {
            tx.start_send(()).unwrap();
            tx.poll_ready(context).unwrap(); // <- missing Context
            sleep(Duration::from_millis(100));
        }
    });

    rx
}

How can I get a hold of the Context?

1 Like

When were you able to write that? Context being exposed is new in 0.2. Can you expand a bit on what you're trying to do? It's unclear why you need a start_send/poll_ready sequence on an unbounded channel.

I was able to write it pre 0.2 release where I'd use tx.poll_complete().
Now that you mentioned it, I don't think I need unbounded channel at all.

Would executor::block_on(tx.send(())).unwrap(); be equivalent?

Ah ok - I thought you were showing a snippet of what you used to be able to do, including the context.

What are you trying to do? Send some message on a timer?

I'm making a stream of current size of terminal using some utils from termion crate.

Full code (that I stole from somewhere :speak_no_evil:):

pub fn size_stream() -> UnboundedReceiver<(u16, u16)> {

    let (mut tx, rx) = unbounded();

    spawn(move || {
        let mut current_size = (0, 0);
        loop {
            match terminal_size() {
                Ok(new_size) => if new_size != current_size {
                    current_size = new_size;
                    tx.start_send(current_size).unwrap();
                    tx.poll_ready().unwrap();
                },
                Err(e) => {}
            }
            sleep(Duration::from_millis(20));
        }
    });

    rx
}

In the old code (or new one, for that matter) you don’t need the start_send/poll_ready duo - you just use tx.unbounded_send(...) and that’s it.

Are you actually using (need) tokio here? Was the old code?

1 Like

Yes I'm using tokio because of bunch of http requests I'm doing in a background.

Ok I see.

Any idea why the old code uses a background thread to send messages? Is it because the old tokio runtime has a single thread that did IO + execution? Reason I ask is because I don’t think that part should be necessary if you use the 0.2 tokio, where the default execution model is a dedicated IO thread + threadpool for execution. You should be able to use tokio-timer crate to create an Interval that defines your send period. Interval is a Stream, so you can spawn that processing onto the executor, and have each tick send a message on the channel. Something to explore/consider perhaps.

I don't have a requirement for more that just IO thread. I tried to rewrite it using tokio-timer, but it doesn't support v0.2 of futures yet so temporarily I need to use background thread with sleep.

Here's a small example I whipped up using tokio 0.2 and futures 0.1.21 (0.2 is a bit of a mess right now in terms of types moving around, and doesn't play too nicely).

extern crate futures;
extern crate tokio;

use tokio::prelude::*;
use tokio::timer::Interval;
use std::time::{Duration, Instant};

fn main() {
    let (tx, rx) = futures::sync::mpsc::unbounded();
    let tx_fut = Interval::new(Instant::now(), Duration::from_secs(1))
        .map_err(|_| ())
        .for_each(move |_| {
            tx.unbounded_send(std::time::SystemTime::now()).unwrap();
            Ok(())
        });

    let fut = tx_fut.select(rx.for_each(|time| {
        println!("{:?}", time);
        Ok(())
    }).map_err(|_| ())).then(|_| Ok(()));
    tokio::run(fut);
}

I guess I'll have to wait for tokio-timer to play nicely with futures 0.2. Then I could probably remove channels all together and just do something like:

let time_stream = Interval::new(Instant::now(), Duration::from_secs(1))
    .map(|_| std::time::SystemTime::now());

I found the code that I "borrowed" from:

https://github.com/xi-frontend/xi-term/blob/master/src/terminal.rs#L42-L83

I'm not sure how to convert io::stdin().events() iterator into a stream without using a seperate thread.

Since UnboundedSender implements Sink you can also do:

    let tx_fut = Interval::new(Instant::now(), Duration::from_secs(1))
            .map(|_| std::time::SystemTime::now())
            .map_err(|_| ())
            .forward(tx.sink_map_err(|_| ()))
            .map(|_| ());
1 Like

Nice - is sink_map_err new? I don't recall seeing it before.

I have no idea. I only found it recently.