Implementing custom `async-std` read stream

I want my structure Output to wrap a TcpStream or any other Read trait and act as a custom reader. To do that I implement Read for Output and Read for &Output. When I run the code below I get cannot borrow as mutable error (noted in the example below).

Here's the the source code:

pub struct Output {
    stream: Pin<Box<dyn Read + Unpin>>, // do I need to Pin here?
}
impl Output {
    pub fn new<S: 'static>(stream: S) -> Self
        where
        S: Read + Unpin,
    {
        Self {
            stream: Box::pin(stream), // Do I need to pin?
        }
    }
}
impl Read for Output {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut &*self).poll_read(cx, buf)
    }
}
impl Read for &Output {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        (&mut self.stream).read(buf); // cannot borrow as mutable
        Pin::new(&mut self.stream).poll_read(cx, buf) // cannot borrow as mutable
   }
}

Here's the error:

error[E0596]: cannot borrow data in a dereference of `std::pin::Pin<&mut &Output>` as mutable
   --> src/lib.rs:213:9
    |
213 |         (&mut self.stream).read(buf);
    |         ^^^^^^^^^^^^^^^^^^ cannot borrow as mutable
    |
    = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `std::pin::Pin<&mut &Output>`

error[E0596]: cannot borrow `self` as mutable, as it is not declared as mutable
   --> src/lib.rs:213:15
    |
207 |         self: Pin<&mut Self>,
    |         ---- help: consider changing this to be mutable: `mut self`
...
213 |         (&mut self.stream).read(buf);
    |               ^^^^ cannot borrow as mutable

error[E0596]: cannot borrow data in a dereference of `std::pin::Pin<&mut &Output>` as mutable
   --> src/lib.rs:214:18
    |
214 |         Pin::new(&mut self.stream).poll_read(cx, buf)
    |                  ^^^^^^^^^^^^^^^^ cannot borrow as mutable
    |
    = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `std::pin::Pin<&mut &Output>`

error[E0596]: cannot borrow `self` as mutable, as it is not declared as mutable
   --> src/lib.rs:214:23
    |
207 |         self: Pin<&mut Self>,
    |         ---- help: consider changing this to be mutable: `mut self`
...
214 |         Pin::new(&mut self.stream).poll_read(cx, buf)
    |                       ^^^^ cannot borrow as mutable

I've tried many different combinations to get self.stream to be mutably borrowed but with no success. I'm not sure exactly how to pin/unpin, box/unbox works here. I'd appreciate some help.

async-std isn't available in the playground, so it isn't as easy for me to create a full example like I can with Tokio, but here are some comments:

  1. The use of Box removes the need for Unpin on the box, so you just want Pin<Box<dyn Read>>
  2. To avoid the box, you would need either unsafe or the pin-project crate.
  3. If you want to implement Read on &Output, then &S must also implement Read. You can't do it with just any stream, hence the mutability errors.

Fundamentally your issue is that you are trying to mutate the stream field through an immutable reference.

Fundamentally your issue is that you are trying to mutate the stream field through an immutable reference.

Yes, exactly but how do I fix that? At first I used Pin::into_inner(self); to unpin self:

// unpin self
let mut s = Pin::into_inner(self);
// use mutable stream
(&mut s.stream).read(buf); // still "cannot borrow `s.stream` as mutable, as it is behind a `&` reference"
// should be similar but it doesn't find the poll_write method
Pin::new(&mut s.stream).poll_read(cx, buf) // still "cannot borrow as mutable"

Could you please point me to the right direction here? What am I missing?

@alice I created a quick project for you here so you can try it out.

@alice Maybe you can help me by providing an example for Tokio and then I try to migrate the logic to async-std? I really got stuck here.

If you are okay without the impl on &Output, you can do it like this:

use std::pin::Pin;
use std::task::{Poll, Context};
use std::io;
use async_std::io::Read;
use pin_project::pin_project;

#[pin_project]
pub struct Output<R: Read> {
    #[pin]
    stream: R,
}
impl<R: Read> Output<R> {
    pub fn new(stream: R) -> Self {
        Self {
            stream,
        }
    }
}
impl<R: Read> Read for Output<R> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        self.project().stream.poll_read(cx, buf)
    }
}
1 Like

Can we get rid of the pin_project dependency? Why, do you find &Output problematic?

Btw, I have this project where you've already helped me implement a similar thing. I hope you find some more magic for this new problem :).

Here's an updated version from the Github repo. I added +Unpin.

use std::pin::Pin;
use async_std::io::Read;
use async_std::task::{Context, Poll};
use async_std::io;
use async_std::net::{TcpStream};
use async_std::prelude::*;

#[async_std::main]
async fn main() {
    let mut stream = TcpStream::connect("google.com:80").await.unwrap();
    stream.write_all(b"GET / HTTP/1.1\r\n").await.unwrap();

    let mut res = Output::new(stream);
    let mut data = Vec::new();
    res.read_to_end(&mut data).await;

    println!("{:?}", data);
}

pub struct Output<R: Read + Unpin> {
    stream: R,
}

impl<R: Read + Unpin> Output<R> {
    pub fn new(stream: R) -> Self {
        Self {
            stream
        }
    }
}

impl<R: Read + Unpin> Read for Output<R> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut &*self).poll_read(cx, buf)
    }
}

impl<R: Read + Unpin> Read for &Output<R> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        // PROBLEM
        let mut stream = &self.stream;
        stream.read(buf);
        stream.poll_read(cx, buf)
    }
}

I'm sure there's a nice workaround for this without using dependencies. Could you please explain the reason why this problem accrues? Can you suggest any other solution?

I guess with Unpin you can do this:

use std::pin::Pin;
use std::marker::Unpin;
use std::task::{Poll, Context};
use std::io;
use async_std::io::Read;

pub struct Output<R> {
    stream: R,
}
impl<R> Unpin for Output<R> {}

impl<R> Output<R> {
    pub fn new(stream: R) -> Self {
        Self {
            stream,
        }
    }
}
impl<R: Read + Unpin> Read for Output<R> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut self.stream).poll_read(cx, buf)
    }
}
impl<'a, R> Read for &'a Output<R>
where
    &'a R: Read,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        Pin::new(&mut &self.stream).poll_read(cx, buf)
    }
}
1 Like

You are amazing! Thank you!

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.