Unsafe messaging to running Future

Hi,
I'm currently working on an embedded rust project where I implement a super-server style server (just a never-ending future) that spawns servers that implement the specific protocol.
These sub-servers obviously need to receive the following messages.
I think that I'd need a channel or something like a Arc<Mutex<BinaryHeap<Msg>>> for this in safe rust.

To circumvent the channel and locking mechanisms, I've created the following unsafe code.
Is this actually safe and could something like this be implemented in safe rust?

struct Running {
    queue: BinaryHeap<Msg>,
    task: Box<dyn Future<Output = ()>>,
}

impl Running {
    pub fn new<FN: FnOnce(MessageStream) -> Box<dyn Future<Output = ()>>>(
        task_creation_fn: FN,
    ) -> Pin<Box<Self>> {
        #[allow(invalid_value)]
        //SAEFTY is initialized below
        let mut this = Box::new(unsafe { MaybeUninit::<Self>::uninit().assume_init() });

        this.queue = BinaryHeap::new();
        this.task = task_creation_fn(MessageStream(NonNull::from_mut(&mut this.queue)));

        Box::into_pin(this)
    }
    fn enqueue(self: Pin<&mut Self>, msg: Msg) {
        //SAEFTY nothing is moved out
        let this = unsafe { self.get_unchecked_mut() };
        this.queue.push(msg);
    }
}
impl Future for Running {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        //SAEFTY nothing is moved out
        let task = unsafe { self.map_unchecked_mut(|x| x.task.as_mut()) };
        task.poll(cx)
    }
}

struct MessageStream(NonNull<BinaryHeap<Msg>>);

impl MessageStream {
    fn poll_recv(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<Msg> {
        let this = self.get_mut();
        // SAEFTY pointer is valid because the corresponding `Running` is pinned and not dropped.
        let queue = unsafe { this.0.as_mut() };

        match queue.pop() {
            Some(x) => Poll::Ready(x),
            None => Poll::Pending,
        }
    }
}

As a first approximation of checking for obvious / clear unsoundness, please try to write an example use-case [a main function] that actually runs the relevant unsafe code path, and execute it through miri. (You can simply use the Tool option on play.rust-lang.org to use miri.)

2 Likes

I didn't read much of your code, but something jumps out.

Calling this when the content is not yet fully initialized causes immediate undefined behavior. The lint you disabled also tells you this.

You could perhaps populate a Box<MaybeUninit<Self>> and then convert that instead.

2 Likes

It seems like the mechanism you're looking for is channels. Tokio offers several flavors with safe APIs.

@quinedot thanks.I've had it a bit different in the project and put that in during cleanup for the post ;/

@steffahn I don't understand how I can make self referential data structures without Miri running into a Stacked Borrows error. I've created a example: Rust Playground

@jorendorff This will be used in a library for an embedded communications protocol. I want to save on the overhead that would come from the Arc, Mutex and Waker used in the channel. I don't need it here, because the queue lives longer than the future, there can't be concurrent access and the super server is acting as a runtime.

1 Like

Oh, I see. I'll have to think about this one.

Unfortunately, there is not yet any stable, correct way to write self: Pin<&mut Self> code. The officially planned way to do it is UnsafePinned which isn’t stable nor fully implemented yet. The way to do it today is to ensure that your type is !Unpin (which is necessary for soundness anyway — Pin pointers don’t offer you any pinning guarantee until you do), such as by adding a field of type PhantomPinned.

One bit here that's unsound is that you can smuggle the MessageStream out of the task: playground.

Another kind of unsoundness to watch out for is the fact that anyone with a &mut Running can simply assign to it, clobbering the whole thing. When that happens, if any drop impl inside the nested task tries to touch the queue, I think that could happen after it's already been freed.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.