When were you able to write that? Context being exposed is new in 0.2. Can you expand a bit on what you're trying to do? It's unclear why you need a start_send/poll_ready sequence on an unbounded channel.
Any idea why the old code uses a background thread to send messages? Is it because the old tokio runtime has a single thread that did IO + execution? Reason I ask is because I don’t think that part should be necessary if you use the 0.2 tokio, where the default execution model is a dedicated IO thread + threadpool for execution. You should be able to use tokio-timer crate to create an Interval that defines your send period. Interval is a Stream, so you can spawn that processing onto the executor, and have each tick send a message on the channel. Something to explore/consider perhaps.
I don't have a requirement for more that just IO thread. I tried to rewrite it using tokio-timer, but it doesn't support v0.2 of futures yet so temporarily I need to use background thread with sleep.
Here's a small example I whipped up using tokio 0.2 and futures 0.1.21 (0.2 is a bit of a mess right now in terms of types moving around, and doesn't play too nicely).
extern crate futures;
extern crate tokio;
use tokio::prelude::*;
use tokio::timer::Interval;
use std::time::{Duration, Instant};
fn main() {
let (tx, rx) = futures::sync::mpsc::unbounded();
let tx_fut = Interval::new(Instant::now(), Duration::from_secs(1))
.map_err(|_| ())
.for_each(move |_| {
tx.unbounded_send(std::time::SystemTime::now()).unwrap();
Ok(())
});
let fut = tx_fut.select(rx.for_each(|time| {
println!("{:?}", time);
Ok(())
}).map_err(|_| ())).then(|_| Ok(()));
tokio::run(fut);
}
I guess I'll have to wait for tokio-timer to play nicely with futures 0.2. Then I could probably remove channels all together and just do something like:
let time_stream = Interval::new(Instant::now(), Duration::from_secs(1))
.map(|_| std::time::SystemTime::now());