Beginner confusion with single call to poll in Tokio/Futures

Hi all, I'm very much a Rust beginner so have been going over the Tokio documentation to try get a grasp of the async/futures world. There is a section 'Coordinating access to a resource' under the 'Spawning' portion of the docs (https://tokio.rs/docs/futures/spawning/) which is based on calculating the round trip time of an (unimplemented) ping/pong request/response. As a bit of a laugh I thought I'd make my own Pong struct which implements Future and waits some random time before completing and return it from the recv_pong method defined on the Transport struct in the web docs.

However when I run the below code the program hangs and the last print statement is "waiting for pong...". I assume the issue is in the poll function I defined in impl Future for Pong. My understanding (which may be wrong) is that when I return Ok(Async::NotReady) it schedules the future to be polled again at a later time but something appears to be obstructing a further poll occurring and I'm not sure what exactly. If anyone could take a look or has any insights I'd much appreciate it!

extern crate tokio;
extern crate futures;
extern crate rand;

use tokio::io;
use futures::{Future, Stream, Sink, Poll, Async};
use futures::future::lazy;
use futures::sync::{mpsc, oneshot};
use std::time::{Duration, Instant};
use rand::Rng;

// My code
struct Pong {
    start: Instant,
    wait_secs: u64,
}

impl Pong {
    fn new(wait_max_secs: u64) -> Pong {
        let mut rng = rand::thread_rng();
        let wait_secs = rng.gen_range(0, wait_max_secs);
        
        Pong {
            start: Instant::now(),
            wait_secs
        }
    }
}

impl Future for Pong {
    type Item = ();
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        if self.start.elapsed().as_secs() > self.wait_secs {
            println!("pong received");
            Ok(Async::Ready(()))
        } else {
            println!("waiting for pong...");
            Ok(Async::NotReady)
        }
    }
}


// Slightly modified code from https://tokio.rs/docs/futures/spawning/
type Message = oneshot::Sender<Duration>;

struct Transport;

impl Transport {
    fn send_ping(&self) {
        println!("entering send_ping");
    }

    fn recv_pong(&self) -> impl Future<Item = (), Error = io::Error> {
        println!("entering recv_pong");
        Pong::new(2)
    }
}

fn coordinator_task(rx: mpsc::Receiver<Message>)
-> impl Future<Item = (), Error = ()>
{
    let transport = Transport;

    rx.for_each(move |pong_tx| {
        let start = Instant::now();

        transport.send_ping();

        transport.recv_pong()
            .map_err(|_| ())
            .and_then(move |_| {
                println!("sending rtt");
                let rtt = start.elapsed();
                pong_tx.send(rtt).unwrap();
                Ok(())
            })
    })
}

/// Request an rtt.
fn rtt(tx: mpsc::Sender<Message>)
-> impl Future<Item = (Duration, mpsc::Sender<Message>), Error = ()>
{
    let (resp_tx, resp_rx) = oneshot::channel();

    tx.send(resp_tx)
        .map_err(|_| ())
        .and_then(|tx| {
            resp_rx.map(|dur| (dur, tx))
                .map_err(|_| ())
        })
}

fn main() {
// Start the application
    tokio::run(lazy(|| {
        // Create the channel that is used to communicate with the
        // background task.
        let (tx, rx) = mpsc::channel(1_024);
    
        // Spawn the background task:
        tokio::spawn(coordinator_task(rx));
    
        // Spawn a few tasks that use the coordinator to requst RTTs.
        for _ in 0..4 {
            let tx = tx.clone();
    
            tokio::spawn(lazy(|| {
                rtt(tx).and_then(|(dur, _)| {
                    println!("duration = {:?}", dur);
                    Ok(())
                })
            }));
        }
    
        Ok(())
    }));
}

(Playground)

Output:

entering send_ping
entering recv_pong
waiting for pong...

Errors:

   Compiling playground v0.0.1 (/playground)
    Finished dev [unoptimized + debuginfo] target(s) in 2.02s
     Running `target/debug/playground`
/root/entrypoint.sh: line 8:     7 Killed                  timeout --signal=KILL ${timeout} "$@"

I'm only slightly exposed to the Rust async model, so I apologize if any of my terminology is wrong. But essentially the issue here is that the tokio executor will only poll your future if it thinks that doing so will yield a change in the polling status (not ready to ready). Typically, this is done by registering some sort of notification channel with the operating system that notifies on your I/O request when it's completed. This way, the CPU thread can go to sleep rather than continuously polling the I/O request (this is good for power and resource utilization, even if it may increase latency).

So you essentially need to tell tokio continuously that if it were to poll your future again, it may complete. After all, that's essentially how you've structured your Future - any later poll may be far enough in the future for the task to have completed, but you have no external signal to indicate this. What I would do is this:

// add this to the top of the file
use tokio::prelude::*;

// One line addition to impl Future for Pong
impl Future for Pong {
    type Item = ();
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        if self.start.elapsed().as_secs() > self.wait_secs {
            println!("pong received");
            Ok(Async::Ready(()))
        } else {
            println!("waiting for pong...");
            task::current().notify(); // ADDED THIS LINE
            Ok(Async::NotReady)
        }
    }
}

Essentially, this will notify your task every time it's polled, so it will always be immediately requeued for polling by tokio. Note that this will result in tokio continuously polling your future until it completes, though other, real tokio work will be interleaved between polls. It will result in high CPU utilization, though.

Ideally, for most sources of asynchrony you will have some sort of signal from the OS you can use to notify your task rather than requiring polling in a busy loop, so this solution doesn't generalize to adapting other I/O sources to Rust's task model.

If somebody else is more familiar with tokio, please let me know if this approach is incorrect.

RE: deleted comments

I had mistakenly thought you were trying to establish multiple ping-pongs in parallel, but I see now that the coordinator_task is set up to only do one ping-pong at a time.

No, there's no magical scheduling. When you return NotReady, it stops polling.

You must keep copy the of task::current() and call task.notify() when the future becomes ready. It's up to you to figure out how to do that.

You can return NotReady only if:

  • you've called poll() on some other Future and it has returned NotReady, or
  • you've saved a copy of task::current() and arranged somehow to call task.notify() later.

If you "just" return NotReady, that's a bug.

Thank you both for the quick and helpful replies, looks like I was completely off the mark with my understanding! Also if anyone else out there is/was similarly confused I found watching Jon Gjengset's YouTube vid on ' The What and How of Futures and async/await in Rust' very illuminating (The What and How of Futures and async/await in Rust - YouTube)

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.