How are you supposed to implement AsyncWrite?

If I have some third party struct that I want to implement Write for, it's easy to make a wrapper like this:

struct ExternalType;

impl ExternalType {
    fn foo(&mut self, data: &[u8]) {
        todo!()
    }
}

struct WriteWrapper(ExternalType);

impl Write for WriteWrapper {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        self.0.foo(buf);
        Ok(buf.len())
    }

    fn flush(&mut self) -> std::io::Result<()> {
        Ok(())
    }
}

But if I have an async struct I want to implement AsyncWrite for, I cannot figure out how to do so. I want to do the equivalent of this:

struct AsyncExternalType;

impl AsyncExternalType {
    async fn foo(&mut self, data: &[u8]) {
        todo!()
    }
}

struct AsyncWriteWrapper(AsyncExternalType);

impl tokio::io::AsyncWrite for AsyncWriteWrapper {
    async fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        self.0.foo(buf).await;
        Ok(buf.len())
    }
}

I understand that this isn't how the tokio::io::AsyncWrite trait works, instead I'm supposed to implement poll_write and friends, but those are not async. Based on my searching the internet, I'm supposed to store a future on the wrapper that I set in the poll, but I cannot figure out how to do so without running into impossible lifetime issues.

How am I supposed to implement tokio::io::AsyncWrite when all I want to do is call an async method with the buf provided on some other type?

Calling from a poll function into async isn't so easy. Often, it would be preferred to simply not do that. If you write the actual logic sans io style (see also), then it's relatively straight forward to make it work with AsyncWrite.

That said, it is possible to call into async using ReusableBoxedFuture. A few examples:

2 Likes

If you absolutely must:

// utility types
type PR<T=()> = Poll<Res<T>>;
type Res<T> = Result<T, std::io::Error>;
type Busy<T> = Pin<Box<dyn Future<Output=T>>>;
  
pub struct AsyncWriteWrapper<T> {
    // `None` if passed to the future
    pub inner: Option<T>,
    // `None` if idle -> `T` when done
    pub future: Option<Busy<T>>
}
  
impl<T> AsyncWriteWrapper<T> {
    pub fn new(inner: T) -> Self {
        Self { inner: Some(inner), future: None }
    }
}

impl AsyncWrite for AsyncWriteWrapper<External> {
    fn poll_write(
        mut self: Pin<&mut Self>, 
        ctx: &mut Context<'_>, 
        buf: &[u8]
    ) -> PR<usize> { 
        let bytes = buf.len();
        // if yet to be written
        if let Some(mut inner) = self.inner.take() {
            // move the `buf` along with the `inner` itself 
            // for as long as it's going to take
            let mut fut_buf = Vec::from(buf);
            // pass it into the future for processing 
            // and return the `inner` when done 
            let future = Box::pin(async move {
                // call your `ExternalType::foo(buf)`
                inner.ext_write(&fut_buf).await;
                println!("{} bytes wrote", bytes);
                inner
            });
            // prep and call the future 
            self.future = Some(future);
            return self.poll_write(ctx, buf);
        }
        // will be available for polling at this point
        let Some(ref mut fut) = self.future 
            else { unreachable!("inner XOR future") };
        // poll/schedule/return
        match fut.as_mut().poll(ctx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(inner) => {
                self.future.take();
                self.inner = Some(inner);
                println!("{} bytes ready", bytes);
                Poll::Ready(Ok(bytes))
            },
        }
    }
    fn poll_flush(
        self: Pin<&mut Self>, 
        ctx: &mut Context<'_>
    ) -> PR { 
        Poll::Ready(Ok(()))
    }
    fn poll_shutdown(
        self: Pin<&mut Self>, 
        ctx: &mut Context<'_>
    ) -> PR { 
        Poll::Ready(Ok(()))
    }
}

See the full example here.

@00100011 That implementation is wrong. If I call poll_write(b"foo") and you return Pending, and I then later call poll_write(b"bar") and you return Ready(3), then you ended up writing "foo" even though you claim to have written "bar".

1 Like

Thus the "if you absolutely must" preface. The poll_write take an exclusive Pin<&mut Self> as a receiver; so do all the write methods of the tokio::io::AsyncWriteExt itself. A sequential chain of wrapper.write_all(b"bytes").await? will necessarily be processed in order. This by its very nature will prevent any false positive (Poll::Ready) ahead of its time in poll_write. [1]

Since the original question was, furthermore:

Ensuring the correct implementation for all the possible edge cases - no matter how unlikely in practice - just didn't seem like a priority. I did try to take a look at the ReusableBoxedFuture you've mentioned: but couldn't quite wrap my head around it - or see how it'd relate to the OP's post.

Mind sharing the fool-proof way to go about it, by the way? With the explicitly highlighted -

This is the first time ojii has posted — let’s welcome them to our community!

referencing sans-io + semaphores + broadcast channels + a wrapper for an asynchronous Stream out of all things when all the individual's asking for is a one more level of abstraction over an async function to be able to call write on it - seems like just a little bit of an overkill.


[1] Unless I'm confusing and misunderstanding some fundamental bits of borrowing and async/await state machinery behind the scenes, as a whole?

1 Like

You need to return Poll::Ready when you create the future to tell the caller that the bytes passed in that call are the ones that were written. Yes, this means that you return Poll::Ready before the data is fully written, but that is what flush is for.

impl AsyncWrite for AsyncWriteWrapper<External> {
    fn poll_write(
        mut self: Pin<&mut Self>, 
        ctx: &mut Context<'_>, 
        buf: &[u8]
    ) -> PR<usize> { 
        if let Some(fut) = self.future.as_mut() {
            let inner = ready!(fut.poll(cx));
            self.inner = Some(inner);
            self.future = None;
        }

        let inner = self.inner.take().unwrap();
        let fut_buf = Vec::from(buf);
        let future = Box::pin(async move {
            inner.ext_write(&fut_buf).await;
            println!("{} bytes wrote", bytes);
            inner
        });
        self.future = Some(future);
        Poll::Ready(Ok(buf.len()))
    }
    fn poll_flush(
        self: Pin<&mut Self>, 
        ctx: &mut Context<'_>
    ) -> PR { 
        if let Some(fut) = self.future.as_mut() {
            let inner = ready!(fut.poll(cx));
            self.inner = Some(inner);
            self.future = None;
        }
        Poll::Ready(())
    }
    fn poll_shutdown(
        self: Pin<&mut Self>, 
        ctx: &mut Context<'_>
    ) -> PR { 
        Poll::Ready(Ok(()))
    }
}

As for ReusableBoxFuture, I guess you are right that it's not necessary. The reason people use it is that calling Box::pin on every single call is pretty expensive, the cost of which ReusableBoxFuture helps you avoid.

The examples I shared show how to implement a poll-based trait given an async fn as implementation for two different traits. The traits are Sink and Stream instead of AsyncWrite, but I do not have an example of this pattern with AsyncWrite specifically.

1 Like

That's what it's for then. Thanks for the clarification - might have taken me ages otherwise.

So they aren't quite written here either. "Pick your poison" between an early/late false positive situation overall. If someone were to rely on the documentation alone deeper down the chain:

On success, returns Poll::Ready(Ok(num_bytes_written)).

They might wrongly assume the num_bytes_written is truly representative of the actual work accomplished, when in fact it hasn't even started. The poll_flush becomes mandatory, which might not be obvious for an otherwise non-buffered ExternalType. Either way: less than ideal.

Though I still agree your implementation makes a bit more sense. It's certainly much easier to ensure there's no pending work left than it would be to check whether the buf moved into the Box::pin is the same buf passed to the poll_write in the first place. Even with just a Drop:

#[cfg(debug_assertions)]
impl Drop {
    fn drop(&mut self) {
        assert_eq!(self.future, None, 
            "AsyncWriteWrapper's future still pending: \
            `flush()`ing after `write()` must be necessary");
    }
}