Hi all, I'm very much a Rust beginner so have been going over the Tokio documentation to try get a grasp of the async/futures world. There is a section 'Coordinating access to a resource' under the 'Spawning' portion of the docs (https://tokio.rs/docs/futures/spawning/) which is based on calculating the round trip time of an (unimplemented) ping/pong request/response. As a bit of a laugh I thought I'd make my own Pong
struct which implements Future
and waits some random time before completing and return it from the recv_pong
method defined on the Transport
struct in the web docs.
However when I run the below code the program hangs and the last print statement is "waiting for pong..."
. I assume the issue is in the poll
function I defined in impl Future for Pong
. My understanding (which may be wrong) is that when I return Ok(Async::NotReady)
it schedules the future to be polled again at a later time but something appears to be obstructing a further poll occurring and I'm not sure what exactly. If anyone could take a look or has any insights I'd much appreciate it!
extern crate tokio;
extern crate futures;
extern crate rand;
use tokio::io;
use futures::{Future, Stream, Sink, Poll, Async};
use futures::future::lazy;
use futures::sync::{mpsc, oneshot};
use std::time::{Duration, Instant};
use rand::Rng;
// My code
struct Pong {
start: Instant,
wait_secs: u64,
}
impl Pong {
fn new(wait_max_secs: u64) -> Pong {
let mut rng = rand::thread_rng();
let wait_secs = rng.gen_range(0, wait_max_secs);
Pong {
start: Instant::now(),
wait_secs
}
}
}
impl Future for Pong {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.start.elapsed().as_secs() > self.wait_secs {
println!("pong received");
Ok(Async::Ready(()))
} else {
println!("waiting for pong...");
Ok(Async::NotReady)
}
}
}
// Slightly modified code from https://tokio.rs/docs/futures/spawning/
type Message = oneshot::Sender<Duration>;
struct Transport;
impl Transport {
fn send_ping(&self) {
println!("entering send_ping");
}
fn recv_pong(&self) -> impl Future<Item = (), Error = io::Error> {
println!("entering recv_pong");
Pong::new(2)
}
}
fn coordinator_task(rx: mpsc::Receiver<Message>)
-> impl Future<Item = (), Error = ()>
{
let transport = Transport;
rx.for_each(move |pong_tx| {
let start = Instant::now();
transport.send_ping();
transport.recv_pong()
.map_err(|_| ())
.and_then(move |_| {
println!("sending rtt");
let rtt = start.elapsed();
pong_tx.send(rtt).unwrap();
Ok(())
})
})
}
/// Request an rtt.
fn rtt(tx: mpsc::Sender<Message>)
-> impl Future<Item = (Duration, mpsc::Sender<Message>), Error = ()>
{
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(resp_tx)
.map_err(|_| ())
.and_then(|tx| {
resp_rx.map(|dur| (dur, tx))
.map_err(|_| ())
})
}
fn main() {
// Start the application
tokio::run(lazy(|| {
// Create the channel that is used to communicate with the
// background task.
let (tx, rx) = mpsc::channel(1_024);
// Spawn the background task:
tokio::spawn(coordinator_task(rx));
// Spawn a few tasks that use the coordinator to requst RTTs.
for _ in 0..4 {
let tx = tx.clone();
tokio::spawn(lazy(|| {
rtt(tx).and_then(|(dur, _)| {
println!("duration = {:?}", dur);
Ok(())
})
}));
}
Ok(())
}));
}
Output:
entering send_ping
entering recv_pong
waiting for pong...
Errors:
Compiling playground v0.0.1 (/playground)
Finished dev [unoptimized + debuginfo] target(s) in 2.02s
Running `target/debug/playground`
/root/entrypoint.sh: line 8: 7 Killed timeout --signal=KILL ${timeout} "$@"