How to share a future between decoupled await/wake code

Hello,

I am trying to implement an asynchronous (in the broader sense of messages being transmitted over a full-duplex communications channel without any intrinsic ordering semantics, as well as utilizing Rust async/await syntax and behavior) request/response pattern in Rust. This is being implemented for browser-side WASM target - so a single-threaded web client runtime, and the actual communications channel is a WebSocket.

Application code, in response to user UI events, will hit the Transport::request() async method, await-ing the response. Requests have a UUID, which is used to index the future in a HashMap. When a response is received with the same UUID, the future is retrieved, the result stashed, and the future Waker invoked. See code below.

One obvious problem is in the request() method - it does not compile with a map dropped while still borrowed so the final future.await statement cannot be invoked as written (strangely, VS Code does not flag this...).

I'm looking for help on how to go about implementing a request/response pattern like this in idiomatic Rust - I'm very new to the language, and the borrow checker, reference lifetimes, async/await subtleties and the various ways I might try to put these primitives together are all kind of swimming around in my head without having converged on coherent, working patterns.

Thanks.

struct Transport {
    context: RefCell<HashMap<Uuid, Pin<Box<TransportFuture>>>>,
}

impl Transport {

    async fn response(&self, channel: AsyncChannel) {
        let response: TransportResponse = channel.recv().await;
        let message_id: Uuid = response.id();

        match self.context.borrow().get_mut(&message_id) {
            Some(future) => {
                future.value = Some(response);
                if let Some(waker) = future.waker.take() {
                    waker.wake();
                }
            },
            None => ... // log unrecognized message ID
        }
    }

    async fn request(&self, request: TransportRequest) -> TransportResponse {
        let future;

        {
            let mut map = self.context.borrow_mut();
            // send message over transport...
            map.insert(request.id(), Box::pin(TransportFuture::new()));
            future = map.get_mut(&message.id()).unwrap();
        }

        future.await
    }
}

#[derive(Clone)]
struct TransportFuture {
    value: Option<TransportResponse>,
    waker: Option<Waker>,
}

impl TransportFuture {
    pub fn new() -> TransportFuture {
        TransportFuture {
            value: None,
            waker: None,
        }
    }
}

impl Future for TransportFuture {
    type Output = TransportResponse;

    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
        if self.value.is_some() {
            return Poll::Ready(self.value.take().unwrap());
        }

        self.waker = Some(cx.waker().clone());
        Poll::Pending
    }
}

The key thing to notice here is that the state that the response future uses is shared between two owners: the channel delivering responses, and the owner of the future polling for one response. This means that you need to enable the sharing by creating a Arc<Mutex<TransportFuture>>, and store that Arc inside your Future implementation and in the map. (That is, you must split the state storage and the Future implementation into two separate structs.)

However, you don't have to actually write that code, because this pattern has already been written: it is a oneshot channel. Oneshot channels are very commonly useful in async programming. (If you're familiar with JavaScript async, a oneshot channel is essentially like a Promise.)

With a oneshot channel, you store the Sender in your HashMap of requests, and you return the Receiver as your future.

So Arc<Mutext<T>> is relevant even on WASM/web platforms which utilize a single thread and event loop? It's usually presented as a solution for safely sharing data between threads in multi-threaded applications. I'm guessing that for WASM/web platforms, the locking mechanisms just compile to a no-op, and its benefit comes from special compiler treatment which relaxes some of Rust's otherwise strict sharing rules when present?

Thanks for the explanations and recommendation for futures_channel::oneshot - I'll give it a shot.

Sorry, I'm used to writing library code that is thread-safe. For this application you should be able to use Rc<RefCell<T>>. The important parts are Rc or Arc for sharing (between the two ends of the channel) and RefCell or Mutex for interior mutability (to be able to update the state of the channel); Rust ownership-and-borrowing requires you to use these in this application, regardless of whether you use the thread-safe versions.

I'm guessing that for WASM/web platforms, the locking mechanisms just compile to a no-op,

Sort of; Arc essentially becomes an Rc and Mutex becomes a RefCell, in the implementation.

and its benefit comes from special compiler treatment which relaxes some of Rust's otherwise strict sharing rules when present?

Yes but no. As I described above, it's not that we need the thread-safety parts. But we're not relaxing rules, we're choosing specific strategies to obey the rules, that are different than the default single-ownership strategy.

  • Rc or Arc introduces a reference count (which in this application will be between 0 and 2) to allow a value to be shared between multiple owners.
  • RefCell or Mutex introduces run-time tracking of whether the value is locked/borrowed, to allow obtaining &mut access to the value without violating the &mut exclusivity rule.
1 Like

I got this working using: Rc<RefCell<HashMap<Uuid, oneshot::Sender<TransportResponse>>>>

Thank you very much!