Implementing tokio's JoinHandle for wasm

Howdy. For learning purposes, I'm trying to get a library that relies on tokio's runtime to work in the browser with wasm. One of the pieces it relies on is JoinHandle. The one method it requires on JoinHandle is abort. I know we wouldn't actually be able to abort in the browser so I just want to support the ability to stop listening for completion. I've got an implementation that works where I didn't have to write any code that manually dips into unsafe but I sort of feel like its a monstrosity and might cause some locks or unintended consequences in a real world scenario.

I'm wondering if anyone with more knowledge of this kind of lower level implementation or async/unsafe/etc code would have suggestions for a better implementation that would be more reliable.

Below is what I came up with:

use std::any::Any;
use tokio::sync::oneshot; // happily surprised that a lot of tokio is just async abstractions that don't rely on the runtime.
use pin_project::pin_project;
use sync_wrapper::SyncWrapper;

#[pin_project]
pub struct JoinHandle<T> {
    result: Arc<Mutex<Option<Result<T, JoinError>>>>,

    #[pin]
    receiver: oneshot::Receiver<T>,
}

impl<T> Future for JoinHandle<T> {
    type Output = Result<T, JoinError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();

        if let Ok(mut result) = this.result.lock() {
            if let Some(result) = result.take() {
                return Poll::Ready(result);
            }
        }

        match this.receiver.poll(cx) {
            Poll::Ready(result) => {
                let result = match result {
                    Ok(result) => Ok(result),
                    Err(_) => Err(JoinError::Cancelled),
                };

                if let Ok(mut guard) = this.result.lock() {
                    *guard = Some(result);
                }

                let mut result = match this.result.lock() {
                    Ok(guard) => guard,
                    Err(error) => error.into_inner(),
                };

                Poll::Ready(result.take().unwrap())
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

impl<T> JoinHandle<T> {
    pub fn abort(&self) {
        if let Ok(mut guard) = self.result.lock() {
            *guard = Some(Err(JoinError::Cancelled));
        }
    }
}

#[derive(Debug)]
pub enum JoinError {
    Cancelled,
    Panic(SyncWrapper<Box<dyn Any + Send + 'static>>),
}

impl JoinError {
    pub fn is_cancelled(&self) -> bool {
        matches!(self, JoinError::Cancelled)
    }

    pub fn is_panic(&self) -> bool {
        matches!(self, JoinError::Panic(_))
    }
}

Thanks in advance!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.