Hello,
I am trying to implement an asynchronous (in the broader sense of messages being transmitted over a full-duplex communications channel without any intrinsic ordering semantics, as well as utilizing Rust async/await
syntax and behavior) request/response pattern in Rust. This is being implemented for browser-side WASM target - so a single-threaded web client runtime, and the actual communications channel is a WebSocket.
Application code, in response to user UI events, will hit the Transport::request()
async
method, await
-ing the response. Requests have a UUID, which is used to index the future in a HashMap
. When a response is received with the same UUID, the future is retrieved, the result stashed, and the future Waker
invoked. See code below.
One obvious problem is in the request()
method - it does not compile with a map dropped while still borrowed
so the final future.await
statement cannot be invoked as written (strangely, VS Code does not flag this...).
I'm looking for help on how to go about implementing a request/response pattern like this in idiomatic Rust - I'm very new to the language, and the borrow checker, reference lifetimes, async/await subtleties and the various ways I might try to put these primitives together are all kind of swimming around in my head without having converged on coherent, working patterns.
Thanks.
struct Transport {
context: RefCell<HashMap<Uuid, Pin<Box<TransportFuture>>>>,
}
impl Transport {
async fn response(&self, channel: AsyncChannel) {
let response: TransportResponse = channel.recv().await;
let message_id: Uuid = response.id();
match self.context.borrow().get_mut(&message_id) {
Some(future) => {
future.value = Some(response);
if let Some(waker) = future.waker.take() {
waker.wake();
}
},
None => ... // log unrecognized message ID
}
}
async fn request(&self, request: TransportRequest) -> TransportResponse {
let future;
{
let mut map = self.context.borrow_mut();
// send message over transport...
map.insert(request.id(), Box::pin(TransportFuture::new()));
future = map.get_mut(&message.id()).unwrap();
}
future.await
}
}
#[derive(Clone)]
struct TransportFuture {
value: Option<TransportResponse>,
waker: Option<Waker>,
}
impl TransportFuture {
pub fn new() -> TransportFuture {
TransportFuture {
value: None,
waker: None,
}
}
}
impl Future for TransportFuture {
type Output = TransportResponse;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
if self.value.is_some() {
return Poll::Ready(self.value.take().unwrap());
}
self.waker = Some(cx.waker().clone());
Poll::Pending
}
}