Fail to implement `AsyncWrite` on a struct which should implement it

Hi all!

I have a slightly advanced nugget for you. I'm currently working on a PR for tokio and think I'm pretty much done, but the test shows that I am still missing something essential.

As a background, I am trying to implement Asyncwrite for generic Sinks which can cast their types to &[u8] and have sink error type std::io::Error (like a file-like interface). My plan was to build up these conversions in the struct and then implement AsyncWriteon my struct, assuming it has such a sink type wrapped.

I seem to make a logic error somewhere in there. It's somewhat intricate though - because I get rust-analyzer hints that AsyncWriteExt is indeed implemented by struct, but if I want to run cargo test, I get complaints that my type doesn't implement it. I have already been thinking that AsyncWriteExt requires the type to be ?Sized and I couldn't tell whether that was the case or not. I'm running a bit out of ideas and would appreciate what I'm missing here.

The following code block contains both the implementation and the test case that fails.

///
/// PART 1: DEFINITION AND TRAIT IMPL
///



use futures_sink::Sink;
use futures_util::{
    future::{ok, Ready},
    sink::{SinkMapErr, With},
    SinkExt,
};
use pin_project_lite::pin_project;
use std::io;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;

pin_project! {
    /// Convert a [`Sink`] of byte chunks into an [`AsyncWrite`].
    /// Bytes are sent into the sink and the sink is flushed once all bytes are sent. If an error occurs during the sending progress,
    /// the number of sent but unflushed bytes are saved in case the flushing operation stays unsuccessful.
    /// For the inverse operation of defining an [`AsyncWrite`] from a [`Sink`] you need to define a [`codec`].
    ///
    /// # Example
    ///
    /// ```
    /// use futures_util::SinkExt;
    /// use std::io::{Error, ErrorKind};
    /// use tokio::io::AsyncWriteExt;
    /// use tokio_util::io::SinkWriter;
    /// use tokio_util::sync::PollSender;
    ///
    /// # #[tokio::main]
    /// # async fn main() -> Result<(), Error> {
    ///  // Construct a channel pair to send data across and wrap a pollable sink.
    ///  // Note that the sink must mimic a writable object, e.g. have `std::io::Error`
    ///  // as its error type.
    ///  let (tx, mut rx) = tokio::sync::mpsc::channel::<&[u8]>(10);
    ///  let mut writer = SinkWriter::new(
    ///      PollSender::new(tx).sink_map_err(|_| Error::from(ErrorKind::Other)),
    ///  );
    ///  // Write data to our interface...
    ///  let data: [u8; 4] = [1, 2, 3, 4];
    ///  let _ = writer.write(&data).await?;
    ///  // ... and receive it.
    ///  assert_eq!(&data, rx.recv().await.unwrap());
    ///
    /// #  Ok(())
    /// # }
    /// ```
    ///
    ///
    /// [`AsyncWrite`]: tokio::io::AsyncWrite
    /// [`Sink`]: futures_sink::Sink
    /// [`codec`]: tokio_util::codec
    #[derive(Debug)]
    pub struct SinkWriter<S, T, E>
    {
        #[pin]
        inner: S,
        _type_marker: PhantomData<T>,
        _error_marker: PhantomData<E>,
    }
}

impl<S, E, T>
    SinkWriter<
        SinkMapErr<
            With<S, T, &[u8], Ready<Result<T, E>>, for<'r> fn(&'r [u8]) -> Ready<Result<T, E>>>,
            fn(E) -> io::Error,
        >,
        T,
        E,
    >
where
    S: Sink<T>,
    E: Into<io::Error> + From<<S as Sink<T>>::Error>,
    T: for<'b> From<&'b [u8]> + ?Sized,
{
    /// Creates a new [`SinkWriter`].
    pub fn new(sink: S) -> Self {
        Self {
            inner: sink
                .with(Self::sink_type_coupler as _)
                .sink_map_err(Self::sink_error_coupler as _),
            _type_marker: PhantomData,
            _error_marker: PhantomData,
        }
    }

    /// Anonymous helper function which provides an awaitable function that
    /// wraps the provided sink into a [`Sink<&[u8]>`].
    fn sink_type_coupler(buf: &[u8]) -> Ready<Result<T, E>> {
        ok::<T, E>(From::from(buf))
    }

    /// Anonymous helper function which provides an awaitable function that
    /// wraps the provided sink erro into a [`io::Error`].
    fn sink_error_coupler(e: E) -> io::Error {
        e.into()
    }

    /// Gets a reference to the underlying sink.
    ///
    /// It is inadvisable to directly write to the underlying sink.
    pub fn get_ref(&self) -> &S {
        self.inner.get_ref().get_ref()
    }

    /// Gets a mutable reference to the underlying sink.
    ///
    /// It is inadvisable to directly write to the underlying sink.
    pub fn get_mut(&mut self) -> &mut S {
        self.inner.get_mut().get_mut()
    }

    /// Consumes this `SinkWriter`, returning the underlying sink.
    ///
    /// It is inadvisable to directly write to the underlying sink.
    pub fn into_inner(self) -> S {
        self.inner.into_inner().into_inner()
    }
}

impl<S, E, T> AsyncWrite for SinkWriter<S, T, E>
where
    for<'r> S: Sink<&'r [u8], Error = io::Error>,
{
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, io::Error>> {
        match self.as_mut().project().inner.poll_ready(cx) {
            Poll::Ready(Ok(())) => {
                if let Err(e) = self.as_mut().project().inner.start_send(buf) {
                    Poll::Ready(Err(e))
                } else {
                    Poll::Ready(Ok(buf.len()))
                }
            }
            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
            Poll::Pending => {
                cx.waker().wake_by_ref();
                Poll::Pending
            }
        }
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
        self.project().inner.poll_flush(cx).map_err(Into::into)
    }

    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
        self.project().inner.poll_close(cx).map_err(Into::into)
    }
}



///
/// PART 2, THE TEST
///



#![warn(rust_2018_idioms)]

use futures_util::SinkExt;
use std::fmt::{self, Debug, Display};
use std::io::{self, Error, ErrorKind};
use tokio::io::AsyncWriteExt;
use tokio_util::io::SinkWriter;
use tokio_util::sync::{PollSendError, PollSender};

#[derive(Debug)]
struct PollSendErrorCoupler<T>(PollSendError<T>);

impl<T: Debug> Display for PollSendErrorCoupler<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        Display::fmt(&self, f)
    }
}

impl<T: Debug> std::error::Error for PollSendErrorCoupler<T> {
}

impl<T> From<PollSendError<T>> for PollSendErrorCoupler<T> {
    fn from(e: PollSendError<T>) -> Self {
        PollSendErrorCoupler(e)
    }
}
impl<T> Into<io::Error> for PollSendErrorCoupler<T> {
    fn into(self) -> io::Error {
        io::Error::from(ErrorKind::BrokenPipe)
    }
}

#[tokio::test]
async fn test_sink_writer() -> Result<(), Error> {
    // Construct a channel pair to send data across and wrap a pollable sink.
    // Note that the sink must mimic a writable object, e.g. have `std::io::Error`
    // as its error type.
    let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(4);
    let writer: SinkWriter<_, _, PollSendErrorCoupler<Vec<u8>>> =
        SinkWriter::new(PollSender::new(tx));

    // Write data to our interface...

    let data: [u8; 4] = [1, 2, 3, 4];
    let _ = writer.write(&data).await; // <- This should work but doesn't....

    // ... and receive it.
    assert_eq!(&data, rx.recv().await.unwrap().as_slice());

    Ok(())
}

I have a suspicion why the trait is not implemented, I don't think the implementation should be fulfilled by any S and T, so I was also trying around with headers like this one, which fails since S is unrestricted:


impl<W, S, E, T> AsyncWrite for SinkWriter<S, T, E>
where
    for<'r> S: Sink<&'r [u8], Error = io::Error>,
    W: Sink<T>,
    E: Into<io::Error> + From<<W as Sink<T>>::Error>,
    T: for<'b> From<&'b [u8]> + ?Sized,
{
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
        [...]

But I'm not sure.
Any help would be appreciated a lot!

I'm not sure where the problem is; after pasting youe code into a Playground and fixing the attribute syntax and import errors, it seems to compile. Can you clarify, preferably by updating the linked Playground, what exactly is wrong?

I'd suspect the trait was not imported in.

1 Like

Hi! Thanks for taking a look. As you see, the function is labeled as a test - change this to [tokio::main] and you get an error similar to what i meant.

Ah, I see, thanks. Looking into it.

1 Like

This impl only applies when S is for<'r> Sink<&'r [u8]>.

S is PollSender<Vec<u8>>. Right? So it only (as far as I can tell) implements Sink<Vec<u8>>. Not for<'r> Sink<&'r [u8]>.

Am I missing something?

2 Likes

Yes, you do - tx is a mpsc::sender<Vec<u8>>, so that is sound. Together with the inner with and sink_map_err, this should impl sink for an arbitrary lifetime. Here i also had a potential case - the HRTB I am using here means that the Sink trait bound must be fulfilled for all lifetimes, right? Should that be a specific but arbitrary (hence abstract) lifetime?

This is also not a Vec problem, changing that to &[u8] also leads to the same complaint. I had aome issues with constructing the impl SinkWriter block before, so the types in there should work as they are supposed to.

EDIT: Ah I see what you mean! Yes you're right - if you check the impl SinkWriter block you will see that the first type (S) is not the sink parameter of the “inner sink” but the wrapped and transformed sink (SinkMapErr<With<Sink>>). That should in either case implement the bound given in the trait implementation, or not? That’s my core assumption.

Indeed, I missed the first time around that With changes what Sink is implemented. I believe this is exactly the reason why the code doesn't work.

I think this reduced example shows the same root problem:

fn is_sink<S>(_: S)
where
    for<'r> S: Sink<&'r [u8]>,
{
}

async fn is_with_sink(
    with: With<
        PollSender<Vec<u8>>,
        Vec<u8>,
        &[u8],
        futures_util::future::Ready<Result<Vec<u8>, PollSendErrorCoupler<Vec<u8>>>>,
        for<'r> fn(
            &'r [u8],
        )
            -> futures_util::future::Ready<Result<Vec<u8>, PollSendErrorCoupler<Vec<u8>>>>,
    >,
) {
    is_sink(with);
}

with the error being the much more precise (although perhaps still somewhat obscure)

error: implementation of `futures_sink::Sink` is not general enough
  --> src/lib.rs:79:5
   |
79 |     is_sink(with);
   |     ^^^^^^^^^^^^^ implementation of `futures_sink::Sink` is not general enough
   |
   = note: `With<PollSender<Vec<u8>>, Vec<u8>, &'2 [u8], futures_util::future::Ready<Result<Vec<u8>, PollSendErrorCoupler<Vec<u8>>>>, for<'r> fn(&'r [u8]) -> futures_util::future::Ready<Result<Vec<u8>, PollSendErrorCoupler<Vec<u8>>>>>` must implement `futures_sink::Sink<&'1 [u8]>`, for any lifetime `'1`...
   = note: ...but it actually implements `futures_sink::Sink<&'2 [u8]>`, for some specific lifetime `'2`

More concisely: In With<_, _, &[u8], _, _>, &[u8] is a concrete type with a lifetime determined by the caller of (in the original code) SinkWriter::new. Its lifetime is not higher-kinded. In effect SinkWriter::new requires that T be convertible from a reference of any lifetime, but the type it actually returns is a SinkWriter that can only deal with a reference of some particular lifetime.

Because With is parameterized with the type U, there's nothing you can do about this. But it seems like it could be made to work, because U is only used in a PhantomData and tbh I don't really see why that is necessary. If With dropped the PhantomData along with those unused parameters U and Item, its Sink<U> implementation would still work [1], and what you're trying to do would also work.

Perhaps you could file a bug or PR against futures_util?


  1. because U is a fresh type parameter used in Sink<U>, and Item is an associated type of F, so it doesn't have to be used ↩︎

1 Like

That is interesting. Three questions here:

  1. Can that be salvaged by generalizing the overall target lifetime, e.g. considering for<'a> With<_, _, &'a [u8], _, _>? I assume not, since lifetimes should only be made more restrictive in my intuition.
  2. if not, do you have a suggestion for an alternative implementation strategy? I think I ran into a similar problem when trying to have the conversions inside the trait implementation, as I could not affect the lifetime of buf: &[u8]. I could maybe assume T: Clone but thought that this would work as well. It should..
  3. Why does rust-analyzer not catch this?

To your last point, I can only speculate - However if I had a .with(g).with(f) construction, I'd have to consider that f -> g -> sink is a valid conversion chain, hence With needs to carry type information about both domain and image of those maps. I don't fully understand how you come to this conclusion though tbh - could you explain?

Another thought - if it's about the lifetime of that reference, would it make it easier if I wrap this into Bytes? That's a type I can Deref in the AsyncWrite implementation and I would have something that is maybe a bit easier to work with. Also feels like an addition that comes from me not fully grasping all details of the problem...

No, for<'a> only works with traits (and function pointers) but not generalized types.

Not really. You'd have to avoid using With, though.

No clue.

Hypothetically, With<Si, Fut, F> does carry that type information: Item is part of Fut::Output, and U is the type of the parameter taken by F. In fact, the problem you're dealing with arises precisely because your U needs to be left undetermined, because F is higher ranked in its argument lifetime, so there are multiple Us that could work and not just one.

I am ready to admit that there may be other reasons why U and Item have to be determined eagerly; just, at a glance it seems unnecessary.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.