Extending or replacing the future returned by `copy_into`

Hi,

To gain some experience with the new async/await and Runtime, I'm trying to extend the sample TCP proxy in the Runtime examples. The goal is to transform the received bytes before transmitting them. A simple example is capitalizing all the letters (assuming all the bytes are "letters", of course), A useful application is applying a stream cipher.

In the example, the proxying in a given direction is done in a single line via the copy_into() function on a ReadHalf instance. copy_into() returns a future called CopyInto--a wrapper around another future called CopyBufInto which copies all the data from the Reader into the Writer until EOF on the Reader occurs.

One possible solution is replacing the call to copy_into() with the creation of a custom future, modeled on CopyBufInto, but with addiional transform logic. To get started on that, I copied the source of the CopyBufInto future into the sample. That resulted in a bunch of compilation failures that looked
tough to resolve. Before I start plowing through those: is creating a custom future a good approach to getting enhanced "copy into" functionality? Or is there some other extension mechanism?

Thanks,

Chuck

A cleaner solution is probably going to be to wrap either the Writer or Reader and do the transformation there.

I think general gist of what you're suggesting is augmenting the functionality on one of the "halves" returned by calling split() on the TcpStream, rather than doing it by implementing a new future. Assuming I chose to modify the ReadHalf, I think what I'd have to to is provide another implementation of the poll_fill_buf() function defined by the AsyncBufRead trait.

However, I keep thinking of this in "object oriented" terms, where you create a "subclass" that "overrides methods" and I don't think that's the right way to conceptualize it. There's some basic Rust technique that I'm unfamiliar with here: when you "wrap" a struct with the intent of changing some behavior, what are the actual mechanics of doing that?

You make a struct that implements a trait and contains another object that it delegates to:

struct TransformingReader<R>(R);

impl<R> AsyncRead for TransformingReader<R>
where
    R: AsyncRead,
{
    // ...
}
1 Like

I tried a couple of things, neither of which worked out.

First, I was wondering if you really meant that the impl should be for AsyncBufRead not AsyncRead. The type constraint is still AsyncRead which seemed reasonable. So I tried this:

struct TransformingReader<R>(R);
impl<'a, R> AsyncBufRead for TransformingReader<R>
where
    R: AsyncRead,
{
    fn poll_fill_buf(self: Pin<&'a mut Self>, cx: &mut Context) -> Poll<Result<&'a [u8], Error>> {
        self.0.poll_fill_buf(cx)
    }

    fn consume(self: Pin<&mut Self>, amt: usize) {
        self.0.consume(amt)
    }
}

That didn't compile. The error was:

27 | impl<'a, R> AsyncBufRead for TransformingReader<R>
   |             ^^^^^^^^^^^^ the trait `futures_io::if_std::AsyncRead` is not implemented for `TransformingReader<R>`

I didn't understand that error message. Doesn't the where R: AsyncRead ensure that the type implements AsyncRead?

Next, I tried was to implement AsyncRead. That resulted in a whole mess of errors: lifetime mismatches, poll_read() method not being found, and others. Implementing AsyncRead doesn't seem necessary, but there's obviously something I'm missing here.

It's complaining that TransformingReader<R>: !AsyncRead because of trait AsyncBufRead: AsyncRead. When a trait has a super-trait like this it's saying that if you implement AsyncBufRead you must also implement AsyncRead.

How to do this depends on what sort of transform you want to do, if you're transforming delimited messages (e.g. every message needs to end with a new-line) then I would use something like futures-codec to convert the IO stream (AsyncRead + AsyncWrite) into a message stream (Stream + Sink), then apply the conversion per-message using StreamExt::map before forwarding between the two.

If your conversion is streaming instead then you're on the right track with implementing AsyncRead, AsyncRead is not a trivial trait to implement though as it interacts with pinning (probably why you're getting method not found errors). The easiest solution is to require the type you wrap to be AsyncRead + Unpin, then you can safely convert from a Pin<&mut Self> to a pin to the inner type, here's an example of a rot13 transform.

Having to implement AsyncRead makes sense, but I'm unsure how to do it.

Part of the problem is that I'm not completely clear on what some of the Rust syntax means:

impl<'a, R> AsyncBufRead for TransformingReader<R>
where
    R: AsyncRead,
{
...
}

I understand this to mean: "implement the AsyncBufRead trait for the TransformingReaderType, for all instances that take an "R" that implements "AsyncRead". Is that an accurate characterization?

Do I also need to change this construct:

struct TransformingReader<R>(R);

to:

struct TransformingReader<R>(R: AsyncRead);

You did that in your example (also adding + Unpin). Does that mean "TransformingReader can only take things that implement AsyncRead"?

Also, do I need to have the type constraints on both the struct and the impl?

So far this is the whole thing. It does not compile yet.

struct TransformingReader<R>(R);

impl<R> AsyncRead for TransformingReader<R>
where
    R: AsyncRead + Unpin,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context,
        buf: &mut [u8],
    ) -> Poll<Result<usize, Error>> {
        self.0.poll_read(cx)
    }
}

impl<'a, R: AsyncRead> AsyncBufRead for TransformingReader<R>
where
    R: AsyncRead,
{
    fn poll_fill_buf(self: Pin<&'a mut Self>, cx: &mut Context) -> Poll<Result<&'a [u8], Error>> {
        self.0.poll_fill_buf(cx)
    }

    fn consume(self: Pin<&mut Self>, amt: usize) {
        self.0.consume(amt)
    }
}

The compiler error is:

26 | impl<'a, R: AsyncRead> AsyncBufRead for TransformingReader<R>
   |                        ^^^^^^^^^^^^ the trait `std::marker::Unpin` is not implemented for `R`
   |
   = help: consider adding a `where R: std::marker::Unpin` bound
   = note: required because of the requirements on the impl of `futures_io::if_std::AsyncRead` for `TransformingReader<R>`

I don't t think that message can be taken literally. Adding + Unpin to the impl<'a, R> AsyncBufRead for TransformingReader<R> results in a lot more errors (the ones I described earlier)

I tried implementing Unpin with:

impl<R> Unpin for TransformingReader<R> {}

Didn't help though. Same error.

Yes, except also with "for any lifetime 'a" which seems suspicious since it's unused.

Yes, it can be useful for giving earlier error messages when trying to construct it (e.g. you could otherwise create a TransformingReader<u32> and will only get an error message saying that it does not implement AsyncRead when you go to use it). Some people recommend keeping the bounds on the data structure as minimal as possible, and just put the bound on the impls (so you would require R: AsyncRead for TransformingReader::new, making it impossible to create an instance of TransformingReader<u32>), but I prefer being upfront about the intended use of the data structure.

Currently, yes, there are plans to add implied bounds to impls based on the bounds of the type, but that's still unimplemented.


fn poll_fill_buf(self: Pin<&'a mut Self>, cx: &mut Context) -> Poll<Result<&'a [u8], Error>>

This doesn't match the signature given in the trait, which is probably the source of your lifetime errors.

You do want to be using R: Asyncread + Unpin to make creating the Pin<&mut R> necessary for calling <R as AsyncRead>::poll_fill_buf easy, without it you'll have to either use unsafe code or one of the utility libraries for pin-projection. Using it makes a delegating identity implementation pretty trivial:

struct TransformingReader<R>(R);

impl<R: AsyncRead + Unpin> AsyncRead for TransformingReader<R> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context,
        buf: &mut [u8],
    ) -> Poll<Result<usize, Error>> {
        Pin::new(&mut self.0).poll_read(cx, buf)
    }
}

impl<R: AsyncBufRead + Unpin> AsyncBufRead for TransformingReader<R> {
    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<&[u8], Error>> {
        Pin::new(&mut self.0).poll_fill_buf(cx)
    }

    fn consume(self: Pin<&mut Self>, amt: usize) {
        Pin::new(&mut self.0).consume(amt)
    }
}

Yes, that function signature was wrong. I cut and pasted it from a documentation page somewhere. I wonder if part of the problem is that there are multiple versions of AsyncRead and AsyncBufRead and I'm including the wrong ones.

I tried pasting your example in and it required that the the self parameter be mut. I changed them but then ran into a compile error with poll_fill_buf. We could explore that avenue, but I'd like to understand the following problem in isolation:

impl<R: AsyncRead> AsyncRead for TransformingReader<R> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context,
        buf: &mut [u8],
    ) -> Poll<Result<usize, Error>> {
        self.0.poll_read(cx)
    }
}

This results in this compiler error:

error[E0599]: no method named `poll_read` found for type `R` in the current scope
  --> src/main.rs:20:16
   |
20 |         self.0.poll_read(cx)
   |                ^^^^^^^^^ method not found in `R`
   |
   = help: items from traits can only be used if the type parameter is bounded by the trait
help: the following trait defines an item `poll_read`, perhaps you need to restrict type parameter `R` with it:
   |
14 | impl<R: futures_io::if_std::AsyncRead + AsyncRead> AsyncRead for TransformingReader<R> {
   |      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

My intent with impl <R: AsyncRead> was to express the idea that R is required to implement the AsyncRead trait. Yet the compiler indicates it doesn't and suggests adding an additional trait bound. Taking its advice literally here results in another error because the if_std is private.

That compiler error message is actually a bug:

What it should be saying is that poll_read takes self: Pin<&mut Self>, which is why you need to call it via Pin::new(&mut self.0).poll_read(cx) (using Unpin to allow Pin::new).

1 Like

ah, ok. Well I don't understand Pinning yet, but changing

self.0.poll_read(cx, buf)

to:

Pin::new(&mut self.0).poll_read(cx, buf)

Did allow it to compile. I had to add mut to self in the function signature too.

As an aside, I think I was complicating things by attempting to implement the AsyncBufRead trait. Looking at this, it may not be needed. I could do the transform inside the AsyncRead trait itself, although I haven't worked out the details for that yet.

To wrap up, I changed the "newtype" struct into a slightly more complex struct that has a little bit of configurable information. The whole thing looks like this:

 struct TransformingReader<'a> {
    msg: String,
    reader: &'a mut ReadHalf<TcpStream>,
}

impl<'a> TransformingReader<'a> {
    fn new(msg: &str, reader: &'a mut ReadHalf<TcpStream>) -> Self {
        Self {
            msg: msg.into(),
            reader,
        }
    }
}

impl<'a> AsyncRead for TransformingReader<'a> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context,
        buf: &mut [u8],
    ) -> Poll<Result<usize, Error>> {
        println!("message: {}", self.msg);
        Pin::new(&mut self.reader).poll_read(cx, buf)
    }
}

@sfackler and @Nemo157: thank you for your help on this.