Hi there,
I try to implement a message broker to deliver messages from parts of my code to a graphql subscription. The idea is to create an event, send it to the broker, and it maps it through a transformation function and then sends it to the subscribed clients. This is my code:
use std::{
any::{Any, TypeId},
collections::HashMap,
marker::PhantomData,
pin::Pin,
sync::Mutex,
task::{Context, Poll},
};
use futures_channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures_util::{future::ready, Future, FutureExt, Stream, StreamExt};
use once_cell::sync::Lazy;
use slab::Slab;
static SUBSCRIBERS: Lazy<Mutex<HashMap<TypeId, Box<dyn Any + Send>>>> = Lazy::new(Default::default);
struct Senders<T>(Slab<UnboundedSender<T>>);
type Mapper<T, U> = dyn Fn(T) -> Pin<Box<dyn futures_util::Future<Output = Option<U>> + Send>> + Send + Sync + 'static;
struct BrokerStream<T: Sync + Send + Clone + 'static, U: Sync + Send + Clone + 'static> {
id: usize,
rx: UnboundedReceiver<T>,
mapper: Box<Mapper<T, U>>,
}
fn with_senders<T, F, R>(f: F) -> R
where
T: Sync + Send + Clone + 'static,
F: FnOnce(&mut Senders<T>) -> R,
{
let mut map = SUBSCRIBERS.lock().unwrap();
let senders = map
.entry(TypeId::of::<Senders<T>>())
.or_insert_with(|| Box::new(Senders::<T>(Default::default())));
f(senders.downcast_mut::<Senders<T>>().unwrap())
}
impl<T: Sync + Send + Clone + 'static, U: Sync + Send + Clone + 'static> Drop for BrokerStream<T, U> {
fn drop(&mut self) {
with_senders::<T, _, _>(|senders| senders.0.remove(self.id));
}
}
impl<T: Sync + Send + Clone + 'static, U: Sync + Send + Clone + 'static> Stream for BrokerStream<T, U> {
type Item = U;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.rx.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => {
let fut = (self.mapper)(msg);
let mut fut = Box::pin(fut);
fut.as_mut().poll(cx)
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
pub struct SimpleBroker<T>(PhantomData<T>);
impl<T: Sync + Send + Clone + 'static> SimpleBroker<T> {
/// Publish a message that all subscription streams can receive.
pub fn publish(msg: T) {
with_senders::<T, _, _>(|senders| {
for (_, sender) in senders.0.iter_mut() {
sender.start_send(msg.clone()).ok();
}
});
}
/// Subscribe to the message of the specified type and returns a `Stream`.
///
/// The `mapper` closure maps the incoming messages of type `T` to outgoing messages of type `U`.
///
/// If the closure returns `None`, the stream item will be skipped.
pub fn subscribe<U, F>(mapper: F) -> impl Stream<Item = U>
where
T: Sync + Send + Clone + 'static,
U: Sync + Send + Clone + 'static,
F: Fn(T) -> Pin<Box<dyn Future<Output = Option<U>> + Send>> + Send + Sync + 'static,
{
with_senders::<T, _, _>(|senders| {
let (tx, rx) = mpsc::unbounded();
let id = senders.0.insert(tx);
let stream = BrokerStream {
id,
rx,
mapper: Box::new(move |msg| {
let future = mapper(msg);
async move { future.await }.boxed()
}),
};
stream.filter_map(|item| ready(Some(item)))
})
}
}
I deliver messages to it like this:
#[derive(Clone, Debug)]
pub struct MessagesSubscriptionEvent(i64);
SimpleBroker::publish(MessagesSubscriptionEvent(current_user.id));
The code to filter and transform the events looks like this:
SimpleBroker::<MessagesSubscriptionEvent>::subscribe(move |event| {
let db = db.clone();
Box::pin(async move {
if event.0 == current_user.id {
messages::count_messages(current_user.id, None, &db).await.ok()
} else {
None
}
})
})
It looks like the inner await never gets resolved, and I don't get why. What am I doing wrong?
Best regards,
CK