Who calls poll_next() method on Stream ? (Tokio chat example)

In the chat example of tokio, Peer implements Stream and proivdes a poll_next method that checks for new incoming messages from the channel or tcp stream.

My question is, when this method is called ?

Every poll-like method is called by the runtime (in this case, Tokio), either when you explicitly spawn the new task, or when the underlying system notifies runtime that the task is probably ready to proceed.

So when does runtime calls this function ?
I mean if I change the poll_next method to this:

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
    println!("Oops !");
    return Poll::Pending;
} 

The 'Oops !' message will be printed one time. And the line
while let Some(result) = peer.next().await
will never be done because of .await.
So what happens here ?

Add a panic and you will get the stack trace.
[(without looking)] peer.next() returns a future and when it is bound to a task (spawned on executor) the await part will be polling the futures poll() which will be calling poll_next.

Actually the poll_next method of Peer itself, calls the poll_next method of tokio::sync::mpsc::UnboundedReceiver and tokio_util::codec::Framed (which contains the tcp stream).

So the poll_next method of Peer itself is checking for any new message coming from these two locations. I feel its like having 2 water streams joining together and creating a new bigger stream which is Peer.

So every time there is message coming from one of these streams, the Peer.poll_next() method should be called to check both of them, and return the message coming from one of them.

The question is who calls this method whenever there is new message ready on these 2 streams ?

There may be more places; I haven't read the example in detail, but it is called here inside the next() function.

This is actually the problem. I want to know the relation between next() method and poll_next().
Because if I always return Poll::Pending inside the poll_next() , The poll_next() will be ran just one time, and the while let will never be true or false ! Therefore neither the codes after the while nor inside the while will not be ran.
Seems like .await is blocking the code and no matter what happens for tcp stream (even if it disconnects), the poll_next() will never be called again.

Well, let's check the source. The next method is implemented by Tokio, so we'll look for it in its documentation and see that this is the provided method of StreamExt trait, implemented for all Streams. Looking at implementation, we see that this method simply creates the Next structure, and this structure has very simple Future implementation, calling the poll_next on the underlying Stream.

Now, when the poll method of Future is called? That's where the magic of executors begins. To save CPU time, poll method is called by default only once, but the runtime will pass the reference to Context struct, containing the Waker. The future, then, can schedule itself to be waked later, i.e. let the runtime know that it might be able to make progress. Only after receiving this signal, the runtime will call poll again.

Usually, this is handled by the low-level libraries like mio, which can hook into the OS to be woken up on some event. But, if you want to check this yourself, just try adding cx.waker().wake_by_ref() before returning Poll::Pending, and you should see you poll_next function called continuously in the loop.

1 Like

Excellent !
Thank you

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.