use futures01::stream::Stream;
use futures01::sync::oneshot::{channel, Sender, Receiver};
use futures01::Poll;
use futures01::Async;
use tokio::prelude::*;
pub struct ByteStream<R>(R);
impl <R: tokio::io::AsyncRead> Stream for ByteStream<R> {
// The same as our future above:
type Item = u8;
type Error = io::Error;
// poll is very similar to our Future implementation, except that
// it returns an `Option<u8>` instead of a `u8`. This is so that the
// Stream can signal that it's finished by returning `None`:
fn poll(&mut self) -> Result<Async<Option<u8>>, io::Error> {
let mut buf = [0;1];
match self.0.poll_read(&mut buf) {
Ok(Async::Ready(n)) => {
// By convention, if an AsyncRead says that it read 0 bytes,
// we should assume that it has got to the end, so we signal that
// the Stream is done in this case by returning None:
if n == 0 {
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(buf[0])))
}
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e)
}
}
}
The above code fails on compliation.
error[E0599]: no method named `poll_read` found for type `R` in the current scope
--> src/transport/utils.rs:50:22
|
50 | match self.0.poll_read(&mut buf) {
| ^^^^^^^^^ method not found in `R`
|
= help: items from traits can only be used if the type parameter is bounded by the trait
help: the following traits define an item `poll_read`, perhaps you need to restrict type parameter `R` with one of them:
|
39 | impl <R: futures_io::if_std::AsyncRead + futures::io::AsyncRead + tokio::io::AsyncRead> Stream for ByteStream<R> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
39 | impl <R: tokio_io::async_read::AsyncRead + futures::io::AsyncRead + tokio::io::AsyncRead> Stream for ByteStream<R> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
39 | impl <R: tokio_io::async_read::AsyncRead + futures::io::AsyncRead + tokio::io::AsyncRead> Stream for ByteStream<R> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The error message confused me.. poll_read method does exist in AsyncRead, but why it cannot find it?
Check which version of tokio you're using, if you're working with futures 0.1 you will want to be using tokio 0.1 as well (your doc link goes to tokio's 0.2.0 alpha release).
Converting between Tokio 0.2's AsyncRead and future 0.1's Stream is going to be more complicated.
Probably the easiest way is to write a wrapper that first converts a tokio(0.2)::io::AsyncRead into a futures(0.3)::stream::Stream<Item = std::io::Result<u8>> (this should be very close to what you have above, except with an extra std::task::Context parameter passed through). Then use futures(0.3) compat layer to convert that stream back into a futures(0.1) stream (this blog post should mostly still be relevant about how to activate the compat layer and which methods to use).
It does not make much difference to use 0.3 futures.
use futures::stream::Stream;
use core::pin::Pin;
use core::task::{ Context, Poll };
use core::task::Poll::Ready;
pub struct ByteStream<R>(R);
impl <R: tokio::io::AsyncRead> Stream for ByteStream<R> {
type Item = u8;
fn poll_next(self : Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut buf = [0;1];
match self.0.poll_read(cx, &mut buf) {
Ready(Ok(n)) => {
if n == 0 {
Ready(None)
} else {
Ready(Some(buf[0]))
}
},
Pending => Pending
}
}
}
error[E0599]: no method named `poll_read` found for type `R` in the current scope
--> src/stream_extension.rs:15:22
|
15 | match self.0.poll_read(cx, &mut buf) {
| ^^^^^^^^^ method not found in `R`
|
= help: items from traits can only be used if the type parameter is bounded by the trait
help: the following traits define an item `poll_read`, perhaps you need to restrict type parameter `R` with one of them:
|
8 | impl <R: futures_io::if_std::AsyncRead + tokio::io::AsyncRead> Stream for ByteStream<R> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
8 | impl <R: tokio_io::async_read::AsyncRead + tokio::io::AsyncRead> Stream for ByteStream<R> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
8 | impl <R: tokio_io::async_read::AsyncRead + tokio::io::AsyncRead> Stream for ByteStream<R> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[dependencies.futures-preview]
version = "0.3.0-alpha.18"
default-features = false
features = ["compat", "async-await", "io-compat", "nightly"]
[dependencies]
tokio = "0.2.0-alpha.4"
Oh, this is a rustc diagnostic bug. AsyncRead::poll_read takes Pin<&mut Self>. If you add R: + Unpin then you can call Pin::new(&mut self.0).poll_read in your implementation.