How can I imply Stream for struct which need mutable borrow itself?

Assume that we have a BinaryReader which contains a async method read_next

impl<R: AsyncRead + AsyncSeek + Unpin + Send> BinaryReader<R> {
    // ......
    async fn read_next(&mut self) -> Result<Option<OwnedEvent>, Error> {
        // ......
        todo!()
    }
}

Then I want to imply trait Stream for it, in this situation I may use an additional field to store the future which was called and pinned before, so that we can consume and return it when next wake.

So is there a way to directly support Stream without the need for a wrappers?

Then I tried to introduce a wrapper:

#[pin_project::pin_project]
struct BinaryReaderWrapper<R: AsyncRead + AsyncSeek + Unpin + Send> {
    #[pin]
    inner: BinaryReader<R>,
    #[pin]
    last_fut: Option<Pin<Box<dyn Future<Output = Result<Option<OwnedEvent>, Error>>>>>
}

impl<R: AsyncRead + AsyncSeek + Unpin + Send> Stream for BinaryReaderWrapper<R> {
    type Item = Result<Option<OwnedEvent>, Error>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
        //                 - let's call the lifetime of this reference `'1`
        let mut this = self.project();

        if this.last_fut.is_none() {
            let ret = Box::pin(this.inner.read_next());
            
            *this.last_fut = Some(ret);
            //                    ^^^ lifetime may not live long enough
            //                    ^^^ cast requires that `'1` must outlive `'static`
        }

        let result = ready!(this.last_fut.as_pin_mut().unwrap().poll(cx));

        this.last_fut = None;

        result
    }
}

But this won't work, the error messages printed by the compiler is marked in the code above.

I can't figure out why lifetime is mismatch here. It seems like BoxFuture need a 'static, but read_next called on a mutable reference of inner, which only has a limited lifetime. But why?

When I change Box::pin(this.inner.read_next()) to Box::pin(async_func_without_a_receiver()), everything seems ok.

So does anyone know how to write a proper implementation for Stream in this situation?

The reason this doesn't work is that your last_fut field would contain a mutable reference to the struct itself, but fields containing references to the struct itself (or other fields of the same struct) are self-referential and cannot be written safely.

To implement Stream by calling an async fn, you need to separate your struct into two structs. The inner struct should have the async fn, and the outer struct should have the actual Stream impl. Then, once you've split the struct into two, you can use the strategy outlined here to implement Stream for the outer struct.

1 Like

Thank you for your help! I will try this

Hello alice, sorry to bother, I don't want to be annoying. But you are the most expert I've ever met :grin: .

I want to check whether my solution has any mistakes.

Besides the solution you given (to seperate wrapper and inner to two structs), I'm trying to solve it by using a self-referential pointer.

// the wrapper, in this solution I haven't seperate them
struct BinaryReaderWrapper<'a, R: AsyncRead + AsyncSeek + Unpin + Send> {
    inner: BinaryReader<R>,
    // pointer to inner
    point_to: std::ptr::NonNull<BinaryReader<R>>,
    last_fut: Option<tokio_util::sync::ReusableBoxFuture<'a, Result<Option<OwnedEvent>, Error>>>,
    _pin: PhantomPinned,
}

impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send> BinaryReaderWrapper<'a, R> {
    fn new(inner: BinaryReader<R>) -> Pin<Box<Self>> {
        let ret = Self {
            inner,
            point_to: std::ptr::NonNull::dangling(),
            last_fut: None,
            _pin: PhantomPinned,
        };

        let mut boxed = Box::pin(ret);
        let inner = std::ptr::NonNull::from(&boxed.inner);

        unsafe {
            boxed.as_mut().get_unchecked_mut().point_to = inner;
        }

        boxed
    }
}

impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a> Stream for BinaryReaderWrapper<'a, R> {
    type Item = Result<OwnedEvent, Error>;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let this;

        // I think it is SAFE, because we don't move any field especially the `inner`
        // The only entrance is a pinned, boxed wrapper
        unsafe {
            this = self.get_unchecked_mut();
            if this.last_fut.is_none() {
                // we don't use inner to poll, but just deref the pointer
                let rn = (*this.point_to.as_mut()).read_next();
                this.last_fut = Some(tokio_util::sync::ReusableBoxFuture::new(rn));
            }
        }

        let ret = match ready!(this.last_fut.as_mut().unwrap().poll(cx)) {
            Ok(Some(ev)) => Poll::Ready(Some(Ok(ev))),
            Ok(None) => return Poll::Ready(None),
            Err(err) => return Poll::Ready(Some(Err(err))),
        };

        // if ret is not None or error, then we can load next value by replacing the last future by a new one.
        this.last_fut.as_mut().unwrap().set(unsafe {(*this.point_to.as_mut()).read_next()});

        ret
    }
}

// the code below illustrate how I use this trait
#[tokio::test]
async fn utf16_plist() {
    use futures::StreamExt;

    let reader = tokio::fs::File::open(&Path::new("./tests/data/utf16_bplist.plist"))
        .await
        .unwrap();

    let streaming_parser = BinaryReader::new(reader);

    let wrapper = BinaryReaderWrapper::new(streaming_parser); // wrapper: Pin<Box<BinaryReaderWrapper<...>>>

    // map and collect are from StreamExt
    let events: Vec<Event> = wrapper.map(|e| e.unwrap()).collect().await;

    println!("{:#?}", events);
}

I have tried utf16_plist(), and it works (seems).

So... Is this solution stable? If not, where can I improve or just learn more about unsafe rust?

Thank you~

Why would you do that? Just move inner into the future object and return it when the future completes.

I see many problems:

  1. inner field dropped before last_fut, but last_fut has reference to inner
  2. The point_to pointer was created from an immutable reference, but is used to mutate the target.
  3. Your struct is annotated with a lifetime, but it contains no borrows to values outside the struct.
  4. You call poll on last_fut while a mutable reference to the BinaryReaderWrapper struct exists, but last_fut modifies the inner field of the struct, which is not allowed because you are holding a mutable reference to the struct, asserting exclusive access to the struct and all of its fields.
1 Like

Hmmm, I write this just to practise and learn :stuck_out_tongue:, Rust's borrow and mutable really confused me a lot, so I'm trying to understand. These code are not writen for production.

But what I can not understand is the first point:

inner field dropped before last_fut, but last_fut has reference to inner

Why inner would drop before last_fut?

inner owned by wrapper, so does last_fut. I mean inner would dropped only if wrapper dropped first, then the fields in wrapper are all useless, so why "what last_fut points to" make sense by then?

Don't know if I misunderstanding, but is its Pin make sure that inner would not be able to move?

And for point 4

last_fut modifies the inner field of the struct, which is not allowed because you are holding a mutable reference to the struct, asserting exclusive access to the struct and all of its fields

Do you mean that what I do is unacceptable, or I just need more methods to ensure exclusive access (e.g. by write some assert!())?

Because when a struct is dropped, its fields are dropped in the order they are written.

What you're doing is not impossible, but its very finicky and hard to get right.

1 Like

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.