Need unsafe to infer future lifetime bounds?

I have a structure (in my case, tokio::fs::ReadDir) that implements an asynchronous method (next_entry()) that returns impl Future. I would like to manually wrap this structure in another structure which implements Stream. In my case the easy answer is to use ReadDir::poll_next_entry(), but in the more general case I am curious as to whether what I want to do is actually possible without using unsafe. This post, answered by @alice, seems to imply that unsafe is required, but addresses a slightly different scenario using raw pointers.

This code on Rust playground uses std::mem::transmute() to extend the lifetime of the pinned future. Comment out the transmute on lines 36-38 and the compiler, "cannot infer an appropriate lifetime for lifetime parameter" for the pinned future.

My question is:

  1. Is this use of unsafe correct? My reasoning is that the future's lifetime is bound by the lifetime of self.names, which is bound, by the lifetime of self, which is 'a. But because poll_next() takes self as Pin<&'_ mut Self> the compiler thinks the lifetime of self is '_ and we must use mem::transmute() to extend it.
  2. Is there a preferable way to do this without unsafe?

Try async-stream crate which provides generator-like api to construct impl Stream value.

No, in your case the easy answer is to use ReadDirStream.

No, there are three issues. One is that the drop order is incorrect. You need to drop the future first, because it has a reference to the other field.

Another is that it violates the aliasing rules because creating an &mut NameStream (e.g. for calling poll_next a second time) asserts exclusive access to all fields in the struct, invalidating the reference to the struct in the future. This one is difficult to get around, but it is possible if you put Names behind a Box that you store with raw pointers because an &mut NameStream does not assert exclusive access recursively through raw pointers.

Additionally you have a problem that the struct should not be Unpin, because you have references from one field to another. Though boxing as suggested above avoids this issue.

There's also the fact that you can avoid the lifetime, but it is not a correctness issue.

Avoiding these issues results in this.

It is worth mentioning that some of these issues actually go away because you async fn always completes on first poll..

Yes. The simplest is to use the async-stream crate, but there is another option, which I have illustrated here. You can find this technique in action in the source code of BroadcastStream, though it is worth noting that it uses an ReusableBoxFuture to avoid creating new boxes all the time.

1 Like

Thank you for the explanation as to why the unsafe lifetime extension in my original attempt is invalid and how to fix it with raw pointers. Even better, thank you for sharing this idiomatic solution, which uses only standard Rust (except for Stream) and does not require any unsafe! While I appreciate the convenience offered by tokio_stream::wrappers and async-stream, sometimes it's nice to know how the magic is made.

In case Playground ever goes up in smoke, the gist of the solution is:

// Some other thing we want to wrap in a stream.
struct OtherThing { /* ... */ }
impl OtherThing {
    pub fn async next_thing(&mut self) -> WhateverType { /* ... */ }
}

// Wrap the other thing in a stream.
struct MyStream {
    fut: Pin<Box<dyn Future<Output = (Option<WhateverType>, OtherThing)>>>,
}

// Helper function.
async fn make_fut(mut other_thing: OtherThing) -> (Option<WhateverType>, OtherThing) {
    let next = other_thing.next_thing().await;
    (next, names)
}

// Voila!
impl MyStream {
    fn new(other_thing: OtherThing) -> Self {
        Self {
            fut: Box::pin(make_fut(other_thing)),
        }
    }
}
impl Stream for MyStream {
    type Item = WhateverType;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        match this.fut.as_mut().poll(cx) {
            Poll::Ready((item, other_thing)) => {
                this.fut = Box::pin(make_fut(other_thing));
                Poll::Ready(item)
            },
            Poll::Pending => Poll::Pending
        }
    }
}

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.