Implement Stream with AsyncRead trait bounds

I'm struggling with the implementation of the Stream trait with AsyncRead bounds that handle custom struct as the associated type. The implementation, I'm trying to accomplish, is a stream that accepts asynchronous input, when some data appears in the input, then data_stream.next().await should return that data to the user of a stream.
I have a working version of the DataStream that polls values of u8 that are wrapped into Cursor to satisfy DataStream trait bounds.
But, have difficulties with implementing the same DataStream with Data as an associated type of polled value.
I would appreciate any input that helps me solve Stream implementation.

[Rust Playground](playground link)

// [dependencies]
// futures = "0.3.21"
// future-utils = "0.12.1"
// tokio = { version = "1.20.1", features = [ "full" ] }

use futures::{AsyncRead, Stream, StreamExt, pin_mut};
use futures::io::Cursor; // Emulates AsyncRead

use core::pin::Pin;
use core::task::{Context, Poll};
use core::task::Poll::{Ready, Pending};

pub struct DataStream<'a, R> {
    stream: &'a mut R
}

impl <'a, R> DataStream <'a, R> {
    fn new(io: &'a mut R) -> Self {
        DataStream { stream: io }
    }
}

#[derive(Debug)]
pub struct Data {
    id: u64,
    payload: Box<[u8]>,
}

impl Data {
    pub fn new(id: u64) -> Self {
        Data { id, payload: Box::new([1, 2, 3]) }
    }
}

// Works, when type Item = u8; 
// impl<'a, R: AsyncRead + Unpin> Stream for DataStream<'a, R> {
//     type Item = u8;

//     fn poll_next(
//         mut self: Pin<&mut Self>, 
//         cx: &mut Context<'_>
//     ) -> Poll<Option<Self::Item>> {
//         let mut buf = [0; 1];
//         match Pin::new(&mut self.stream).poll_read(cx, &mut buf) {
//             Ready(Ok(num_bytes_read)) => {
//                 if num_bytes_read > 0 {
//                     Ready(Some(buf[0]))
//                 } else {
//                     Ready(None)
//                 }
//             },
//             Pending => Pending,
//             Ready(Err(_)) => Ready(None),
//         }
//     }  
// }

// Have issues here...
impl<'a, R: AsyncRead + Unpin> Stream for DataStream<'a, R> {
    type Item = Data;

    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        let mut buf = [0; 1];
        match Pin::new(&mut self.stream).poll_read(cx, &mut buf) {
            Ready(Ok(num_bytes_read)) => {
                if num_bytes_read > 0 {

                    // What is correct way to fetch incoming Data and return it?
                    // How could I convert &mut buf into Data
                    Ready(Some(buf[0]))

                    // error[E0308]: mismatched types
                    //   --> src\test_data_stream.rs:66:32
                    //    |
                    // 66 |                     Ready(Some(buf[0]))
                    //    |                                ^^^^^^ expected struct `Data`, found `u8`

                } else {
                    Ready(None)
                }
            },
            Pending => Pending,
            Ready(Err(_)) => Ready(None),
        }
    }  
}

async fn test_data_stream_with_async_bytes_input() {
    // Data: Sequence of bytes;
    let v: Vec<u8> = (0..10).collect(); 

    // Something, that emulates async input for the DataStream;
    let mut async_read_io = Cursor::new(v); 

    // DataStream consumes async input and yelds bytes...
    let data_stream = DataStream::new(&mut async_read_io); 
    pin_mut!(data_stream);
    while let Some(b) = data_stream.next().await {
        println!("Byte: {:?}", b);
    }
}

async fn test_data_stream_with_async_data_input() {
    // Data: Sequence of Data structs;
    let v: Vec<Data> = vec![Data::new(1), Data::new(2), Data::new(3)]; 
    
    // Something, that emulates async input (implements AsyncRead) for the DataStream;
    let mut async_read_io = Cursor::new(v); 

    // DataStream consumes async input and yelds Data...
    let data_stream = DataStream::new(&mut async_read_io); 
    pin_mut!(data_stream);
    while let Some(b) = data_stream.next().await {
        println!("Byte: {:?}", b);
    }

// error[E0599]: the method `next` exists for struct `Pin<&mut DataStream<'_, futures::io::Cursor<Vec<Data>>>>`, but its trait bounds were not satisfied
//    --> src\test_data_stream.rs:92:37
//     |
// 8   | pub struct DataStream<'a, R> {
//     | ----------------------------
//     | |
//     | doesn't satisfy `_: StreamExt`
//     | doesn't satisfy `_: Stream`
// ...
// 92  |     while let Some(b) = data_stream.next().await {
//     |                                     ^^^^ method cannot be called on `Pin<&mut DataStream<'_, futures::io::Cursor<Vec<Data>>>>` due to unsatisfied trait bounds   
//     |
//    ::: C:\Users\alexander\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:408:1
//     |
// 408 | pub struct Pin<P> {
//     | -----------------
//     | |
//     | doesn't satisfy `_: StreamExt`
//     | doesn't satisfy `_: Stream`
//     |
//     = note: the following trait bounds were not satisfied:
//             `Pin<&mut DataStream<'_, futures::io::Cursor<Vec<Data>>>>: Stream`
//             which is required by `Pin<&mut DataStream<'_, futures::io::Cursor<Vec<Data>>>>: StreamExt`
//             `DataStream<'_, futures::io::Cursor<Vec<Data>>>: Stream`
//             which is required by `DataStream<'_, futures::io::Cursor<Vec<Data>>>: StreamExt`
// note: the following trait must be implemented
//    --> C:\Users\alexander\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-core-0.3.21\src\stream.rs:27:1
//     |
// 27  | / pub trait Stream {
// 28  | |     /// Values yielded by the stream.
// 29  | |     type Item;
// 30  | |
// ...   |
// 97  | |     }
// 98  | | }
//     | |_^
}

#[tokio::main]
async fn main() -> tokio::io::Result<()> {
    test_data_stream_with_async_bytes_input().await;
    test_data_stream_with_async_data_input().await;
    Ok(())
}

If you want to return a single byte, then perhaps you should return this?

Ready(Some(Data {
    id: 0,
    payload: Box::new(buf)
}))

Thank you for the reply, I believe, at this point, the return value is clear to me. I still don't understand the compile error, that points me to the unsatisfied lifetimes and/or trait bounds for Stream and StreamExt.
Why DataStream doesn't satisfy Stream's lifetime (trait bounds)? play.rust-lang.org

For example, I have the following poll_next impl:

impl<'a, R> Stream for DataStream<'a, R>
    where R: AsyncRead + Unpin
{
    type Item = Data;
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {        
        let stream = self.get_mut();
        let my_data = stream.poll_next_unpin(cx);
        my_data
    }
}

The following error message I have:

error[E0599]: the method `next` exists for struct `Pin<&mut DataStream<'_, futures::io::Cursor<&[Data; 3]>>>`, but its trait bounds were not satisfied
   --> src\test_data_stream.rs:62:40
    |
12  | / pub struct DataStream<'a, R>
13  | |     // where R: AsyncRead + Unpin
14  | |     // where R: AsyncRead
15  | | {
16  | |     stream: &'a R
17  | | }
    | | -
    | | |
    | |_doesn't satisfy `_: StreamExt`
    |   doesn't satisfy `_: Stream`
...
62  |       while let Some(data) = data_stream.next().await {
    |                                          ^^^^ method cannot be called on `Pin<&mut DataStream<'_, futures::io::Cursor<&[Data; 3]>>>` due to unsatisfied trait bounds     
    |
   ::: C:\Users\alexander\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:408:1
    |
408 |   pub struct Pin<P> {
    |   -----------------
    |   |
    |   doesn't satisfy `_: StreamExt`
    |   doesn't satisfy `_: Stream`
    |
    = note: the following trait bounds were not satisfied:
            `Pin<&mut DataStream<'_, futures::io::Cursor<&[Data; 3]>>>: Stream`
            which is required by `Pin<&mut DataStream<'_, futures::io::Cursor<&[Data; 3]>>>: StreamExt`
            `DataStream<'_, futures::io::Cursor<&[Data; 3]>>: Stream`
            which is required by `DataStream<'_, futures::io::Cursor<&[Data; 3]>>: StreamExt`
note: the following trait must be implemented
   --> C:\Users\alexander\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-core-0.3.21\src\stream.rs:27:1
    |
27  | / pub trait Stream {
28  | |     /// Values yielded by the stream.
29  | |     type Item;
30  | |
...   |
97  | |     }
98  | | }
    | |_^

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.