Implementing a Stream by Hand: Unpin Troubles

I'm trying to implement a Stream by hand based on the answer to this question. It seems to me that my safety logic is sound, since I only ever access conn or inp after checking that the current future is None (pursuant to Alice Rhyl's comment that UB comes from accessing fields referred to by the future while it still exists).

The problem is that Next requires its inner type to be Unpin, which this solution specifically relies on being untrue. I have just learned about the async-stream crate, which would obviously be much easier, but I would rather not pay for message passing if I can avoid it. Is there any way to fix this code? I can't get ownership of the Connection because they're coming from a deadpool::managed::Pool.

Also, I added this Unpin bound to C early in development because I figured it was auto-implemented and it couldn't hurt, but I have since learned that to be a bad call. I don't think my codebase relies on it and I could remove that bound with a little work, but that wouldn't fix the current problem.

If it isn't obvious, this is my first time dealing with async at such a low level, so I'm a little lost here.

pub struct Query<'conn, T, C>
where
    T: ProtocolEncoding,
    C: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
    conn: &'conn mut Connection<T, C>,
    inp: GenQueryInp,
    last_page: bool,
    rows_processed: u32,
    current_page: Option<Vec<Vec<String>>>,
    query_future: Option<BoxFuture<'static, Result<GenQueryOut, IrodsError>>>,
    _pinned: PhantomPinned,
}

impl<'conn, T, C> Query<'conn, T, C>
where
    T: ProtocolEncoding,
    C: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
    pub fn new(conn: &'conn mut Connection<T, C>, inp: GenQueryInp) -> Self {
        Self {
            conn,
            inp,
            last_page: false,
            rows_processed: 0,
            current_page: None,
            query_future: None,
            _pinned: PhantomPinned,
        }
    }
}

impl<'conn, T, C> Stream for Query<'conn, T, C>
where
    T: ProtocolEncoding + Send + Sync,
    C: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + Sync,
{
    type Item = Result<Vec<String>, IrodsError>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = unsafe { Pin::into_inner_unchecked(self) };

        if this.current_page.is_some() {
            // Do we have a page ready?
            let page = unsafe { this.current_page.as_mut().unwrap_unchecked() };
            match page.pop() {
                // Does this page have another row?
                Some(row) => {
                    this.rows_processed += 1;
                    return Poll::Ready(Some(Ok(row)));
                }
                None => {
                    // If not, start polling for another page
                    this.current_page = None;
                }
            }
        }

        if this.query_future.is_none() {
            if this.last_page {
                return Poll::Ready(None);
            }
            if this.rows_processed >= this.inp.max_rows {
                return Poll::Ready(None);
            }

            let fut: BoxFuture<Result<GenQueryOut, IrodsError>> =
                this.conn.query(&this.inp).boxed();

            let fut: BoxFuture<'static, Result<GenQueryOut, IrodsError>> =
                unsafe { std::mem::transmute(fut) };

            this.query_future = Some(fut);
        }

        match unsafe { this.query_future.as_mut().unwrap_unchecked().poll_unpin(cx) } {
            Poll::Ready(out_result) => match out_result {
                Ok(out) => {
                    let mut page: Vec<Vec<String>> = out.into_page_of_rows();

                    this.query_future = None;

                    match page.pop() {
                        Some(row) => {
                            this.rows_processed += 1;

                            return Poll::Ready(Some(Ok(row)));
                        }
                        None => {
                            this.current_page = None;
                        }
                    }
                }
                Err(err) => return Poll::Ready(Some(Err(err))),
            },
            Poll::Pending => return Poll::Pending,
        };

        Poll::Ready(None)
    }
}

Update: I've made this which...compiles? The error I'm getting when I compile mine complains about the inner type of Next not being Unpin, which I assumed was coming from the PhantomPinned, so now I'm double confused.

Further update: I have (re)discovered that issue actually came from actually iterating:

    let query = Query::new(&mut conn, inp);
    pin_mut!(query);

    while let Some(result) = query.next().await {
        println!("{:?}", result);
    }

What I was missing before was pin_mut on stream type, which I discovered in async-stream docs.
So I suppose my code "works," although I have fundamentally no idea what many of the components mean. As such, I will almost certainly opt for async-stream, but I would still like to understand whether the basic safety of the original code is sound, what in the world is going on with "pin_mut," etc.

:-1: Please don't ever do this. We have pattern matching in this language.

if let Some(page) = this.current_page.as_mut() {
    // ...
}

In general, there seems to be a lot of unsafe sprinkled throughout the code with no justification. I can't run this under Miri because there's too much context missing (the code doesn't compile as-is and it's not at all obvious what the missing pieces are supposed to be), but you should try it yourself and get back to us with the instances of UB it will have reported.

I'm not entirely sure what this is supposed to do, but hand-implementing streams and futures is almost never the right thing to do. If you are trying to fetch items from a stream in batches, just use the existing dedicated combinator.

I don't have a pre-existing stream, I'm trying to implement a stream from a single async function which can get a page of rows from a SQL-like API and reports whether there are more rows to be fetched.

Indeed, I don't trust myself to write unsafe; the only unsafe here is paraphrased as closely as possible to the answer I linked to, except the unwrap_unchecked which are trivial as far as I can tell.

The nut of my question is what do with code that is like the code in that answer, but with a type which has to be Unpin, since that bound applies to the Next type but not generally to Poll types.

EDIT: I can't get Miri to run this because apparently you can't call foreign functions under it (such as socket), but this Miri is happy with

This feels like a case for using futures::stream::unfold, which you could use to give you a stream of full pages, and then StreamExt::flatten to go from a stream of pages to a stream of rows.

This does seem useful! Thanks for the plug. I'm still a little worried about how the lifetime of my &mut Connection will interact with the async closure, but at least there the compiler will catch me.

I'm curious then: What is a case where you would reach for hand-implementing a Stream?

Personally, only when implementing the stream via functions like unfold, poll_fn or iter (in combination with StreamExt::map, StreamExt::buffer_unordered or StreamExt::buffered) is too hard to get right, and it's easier to hand-implement. The most common case for this is when I have something whose interface is already stream-like to wrap - so I'm just converting calls to Stream::poll_next into calls to inner.poll_next_item or similar.

I apply a similar sort of logic to implementing std::iter::Iterator, FWIW - where possible, I prefer to implement it using functionality from std::iter rather than implementing it by hand. I find I'm less likely to make mistakes the compiler doesn't catch this way, and that when I do make mistakes, the compiler errors are easier to follow, than if I try to implement Iterator by hand (with the joys of lifetimes on structs to get right).

Do you have a go-to for passing non-owned data into a moving closure like this? Can't really think of a non-clunky way to do this.

You've not given me enough code to be helpful here.

In general, I write things like:

stream::unfold(
    init,
    {
        let data = &mut data;
        let cloned = cloned.clone();
        move |state| async move {
            …
        }
    }
);

to move references in for data.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.