Getting rid of Pin<Box<_>> in a (toy) stream adapter

Hi all,

trying to understand async, pin and streams I wrote this toy:

struct MyIntersperse<S: Stream<Item = String>> {
    source: Pin<Box<S>>,
    interspersed: bool,
}

impl<S: Stream<Item = String>> MyIntersperse<S> {
    fn new(source: S) -> MyIntersperse<S> {
        MyIntersperse {
            source: Box::pin(source),
            interspersed: false,
        }
    }
}

impl<S: Stream<Item = String>> Stream for MyIntersperse<S> {
    type Item = String;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if !self.interspersed {
            self.interspersed = true;
            Poll::Ready(Some(String::from("---")))
        } else {
            match Pin::new(&mut self.source).poll_next(cx) {
                Poll::Pending => Poll::Pending,
                Poll::Ready(None) => Poll::Ready(None),
                Poll::Ready(Some(s)) => {
                    self.interspersed = false;
                    Poll::Ready(Some(s))
                }
            }
        }
    }
}

My question: Why do I need Pin<Box<S>> in struct MyIntersperse? I thought, when self is pinned in poll_next() I can also pin self.source. But this does not work. I get a compiler error. I "fixed" this by trial and error and came across Pin<Box<S>>. But maybe there is a better solution?

Thanks for help and some reading pointers!

You need to perform a pin projection. You can do that by defining a function like this:

struct MyIntersperseProject<'a, S> {
    source: Pin<&'a mut S>,
    interspersed: &'a mut bool,
}

fn project(self: Pin<&mut Self>) -> MyIntersperseProject<'_, S> {
    let this = unsafe { Pin::into_inner_unchecked(self) };
    MyIntersperseProject {
        source: unsafe { Pin::new_unchecked(&mut this.source) },
        interspersed: &mut this.interspersed,
    }
}

Then, use it like this:

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
    let this = self.project();
    
    if !this.interspersed {
        this.interspersed = true;
        Poll::Ready(Some(String::from("---")))
    } else {
        match this.source.poll_next(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(None) => Poll::Ready(None),
            Poll::Ready(Some(s)) => {
                this.interspersed = false;
                Poll::Ready(Some(s))
            }
        }
    }
}

full example

The above is unsafe, but the unsafe code is necessary. That said, there are crates like pin-project or pin-project-lite that provide a macro that can write the unsafe code for you. Both macros generate a .project() method like the one I provided in the above example.

2 Likes

"pin projection"... Ok...

Thank you a lot! Even for the very fast reply!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.