Convert tokio::tcp::TcpStream to a future stream

I am trying to convert tokio::net::tcp::TcpStream to a futures Stream by following this guide.

use futures01::stream::Stream;
use futures01::sync::oneshot::{channel, Sender, Receiver};
use futures01::Poll;
use futures01::Async;
use tokio::prelude::*;

pub struct ByteStream<R>(R);

impl <R: tokio::io::AsyncRead> Stream for ByteStream<R> {

    // The same as our future above:
    type Item = u8;
    type Error = io::Error;

    // poll is very similar to our Future implementation, except that
    // it returns an `Option<u8>` instead of a `u8`. This is so that the
    // Stream can signal that it's finished by returning `None`:
    fn poll(&mut self) -> Result<Async<Option<u8>>, io::Error> {
        let mut buf = [0;1];
        match self.0.poll_read(&mut buf) {
            Ok(Async::Ready(n)) => {
                // By convention, if an AsyncRead says that it read 0 bytes,
                // we should assume that it has got to the end, so we signal that
                // the Stream is done in this case by returning None:
                if n == 0 {
                    Ok(Async::Ready(None))
                } else {
                    Ok(Async::Ready(Some(buf[0])))
                }
            },
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(e) => Err(e)
        }
    }
}

The above code fails on compliation.

error[E0599]: no method named `poll_read` found for type `R` in the current scope
  --> src/transport/utils.rs:50:22
   |
50 |         match self.0.poll_read(&mut buf) {
   |                      ^^^^^^^^^ method not found in `R`
   |
   = help: items from traits can only be used if the type parameter is bounded by the trait
help: the following traits define an item `poll_read`, perhaps you need to restrict type parameter `R` with one of them:
   |
39 | impl <R: futures_io::if_std::AsyncRead + futures::io::AsyncRead + tokio::io::AsyncRead> Stream for ByteStream<R> {
   |       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
39 | impl <R: tokio_io::async_read::AsyncRead + futures::io::AsyncRead + tokio::io::AsyncRead> Stream for ByteStream<R> {
   |       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
39 | impl <R: tokio_io::async_read::AsyncRead + futures::io::AsyncRead + tokio::io::AsyncRead> Stream for ByteStream<R> {
   |       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The error message confused me.. poll_read method does exist in AsyncRead, but why it cannot find it?

Thank you

Check which version of tokio you're using, if you're working with futures 0.1 you will want to be using tokio 0.1 as well (your doc link goes to tokio's 0.2.0 alpha release).

Hi @Nemo157

Yep, I am using 0.2.0.alpha for its async support.
I see TcpStream implements tokio::io::AsyncRead trait. Do I need AsyncRead from futures?

Converting between Tokio 0.2's AsyncRead and future 0.1's Stream is going to be more complicated.

Probably the easiest way is to write a wrapper that first converts a tokio(0.2)::io::AsyncRead into a futures(0.3)::stream::Stream<Item = std::io::Result<u8>> (this should be very close to what you have above, except with an extra std::task::Context parameter passed through). Then use futures(0.3) compat layer to convert that stream back into a futures(0.1) stream (this blog post should mostly still be relevant about how to activate the compat layer and which methods to use).

Thank you, @Nemo157

It does not make much difference to use 0.3 futures.

use futures::stream::Stream;
use core::pin::Pin;
use core::task::{ Context, Poll };
use core::task::Poll::Ready;

pub struct ByteStream<R>(R);

impl <R: tokio::io::AsyncRead> Stream for ByteStream<R> {

    type Item = u8;


    fn poll_next(self : Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = [0;1];
        match self.0.poll_read(cx, &mut buf) {
            Ready(Ok(n)) => {

                if n == 0 {
                    Ready(None)
                } else {
                    Ready(Some(buf[0]))
                }
            },
            Pending => Pending
        }
    }
}
error[E0599]: no method named `poll_read` found for type `R` in the current scope
  --> src/stream_extension.rs:15:22
   |
15 |         match self.0.poll_read(cx, &mut buf) {
   |                      ^^^^^^^^^ method not found in `R`
   |
   = help: items from traits can only be used if the type parameter is bounded by the trait
help: the following traits define an item `poll_read`, perhaps you need to restrict type parameter `R` with one of them:
   |
8  | impl <R: futures_io::if_std::AsyncRead + tokio::io::AsyncRead> Stream for ByteStream<R> {
   |       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
8  | impl <R: tokio_io::async_read::AsyncRead + tokio::io::AsyncRead> Stream for ByteStream<R> {
   |       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
8  | impl <R: tokio_io::async_read::AsyncRead + tokio::io::AsyncRead> Stream for ByteStream<R> {
   |       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[dependencies.futures-preview]
version = "0.3.0-alpha.18"
default-features = false
features = ["compat", "async-await", "io-compat", "nightly"]

[dependencies]
tokio = "0.2.0-alpha.4"

Oh, this is a rustc diagnostic bug. AsyncRead::poll_read takes Pin<&mut Self>. If you add R: + Unpin then you can call Pin::new(&mut self.0).poll_read in your implementation.

1 Like
use futures::stream::Stream;
use core::pin::Pin;
use core::task::{ Context, Poll };
use core::task::Poll::{ Ready, Pending };

pub struct ByteStream<R>(R);

impl <R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> {

    type Item = u8;


    fn poll_next(self : Pin<&mut Self> , cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = [0;1];
        match Pin::new(&mut self.0).poll_read(cx, &mut buf) {
            Ready(Ok(n)) => {
                if n == 0 {
                    Ready(None)
                } else {
                    Ready(Some(buf[0]))
                }
            },
            Ready(Err(_)) => {
                Ready(None)
            },
            Pending => Pending
        }
    }
}

Seems there is no way to workaround it.

error[E0596]: cannot borrow `self` as mutable, as it is not declared as mutable
  --> src/stream_extension.rs:15:29
   |
13 |     fn poll_next(self : Pin<&mut Self> , cx: &mut Context) -> Poll<Option<Self::Item>> {
   |                  ---- help: consider changing this to be mutable: `mut self`
14 |         let mut buf = [0;1];
15 |         match Pin::new(&mut self.0).poll_read(cx, &mut buf) {
   |                             ^^^^ cannot borrow as mutable

Hence I have to use unsafe to convert to mutable.

impl <R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> {

    type Item = u8;


    fn poll_next(self : Pin<&mut Self> , cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = [0;1];
        let r : Pin<&mut R>;
        unsafe {
            let const_ptr = &self.0 as *const R;
            let mut_ptr = const_ptr as *mut R;
            r = Pin::new(&mut *mut_ptr);
        }
        match r.poll_read(cx, &mut buf) {
            Ready(Ok(n)) => {
                if n == 0 {
                    Ready(None)
                } else {
                    Ready(Some(buf[0]))
                }
            },
            Ready(Err(_)) => {
                Ready(None)
            },
            Pending => Pending
        }
        
    }
}

Is there a better way ? :slight_smile:

Following this advice from the compiler makes it compile just fine for me.

I would even clean it up a bit like this:

use futures::{ stream::Stream, ready };
use core::pin::Pin;
use core::task::{ Context, Poll };


pub struct ByteStream<R>(R);

impl <R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> 
{
   type Item = u8;


   fn poll_next(mut self : Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> 
   {
      let mut buf = [0;1];

      match ready!( Pin::new(&mut self.0).poll_read(cx, &mut buf) ) 
      {
         Ok(n) if n != 0 => Some(buf[0]).into() ,
         _               => None        .into() ,
      }
   }
}
2 Likes

Thank you both, @Nemo157 @najamelan :star_struck:

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.