Message broker implementation: inner await never gets resolved

Hi there,

I try to implement a message broker to deliver messages from parts of my code to a graphql subscription. The idea is to create an event, send it to the broker, and it maps it through a transformation function and then sends it to the subscribed clients. This is my code:

use std::{
    any::{Any, TypeId},
    collections::HashMap,
    marker::PhantomData,
    pin::Pin,
    sync::Mutex,
    task::{Context, Poll},
};

use futures_channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures_util::{future::ready, Future, FutureExt, Stream, StreamExt};
use once_cell::sync::Lazy;
use slab::Slab;

static SUBSCRIBERS: Lazy<Mutex<HashMap<TypeId, Box<dyn Any + Send>>>> = Lazy::new(Default::default);

struct Senders<T>(Slab<UnboundedSender<T>>);

type Mapper<T, U> = dyn Fn(T) -> Pin<Box<dyn futures_util::Future<Output = Option<U>> + Send>> + Send + Sync + 'static;

struct BrokerStream<T: Sync + Send + Clone + 'static, U: Sync + Send + Clone + 'static> {
    id: usize,
    rx: UnboundedReceiver<T>,
    mapper: Box<Mapper<T, U>>,
}

fn with_senders<T, F, R>(f: F) -> R
where
    T: Sync + Send + Clone + 'static,
    F: FnOnce(&mut Senders<T>) -> R,
{
    let mut map = SUBSCRIBERS.lock().unwrap();
    let senders = map
        .entry(TypeId::of::<Senders<T>>())
        .or_insert_with(|| Box::new(Senders::<T>(Default::default())));
    f(senders.downcast_mut::<Senders<T>>().unwrap())
}

impl<T: Sync + Send + Clone + 'static, U: Sync + Send + Clone + 'static> Drop for BrokerStream<T, U> {
    fn drop(&mut self) {
        with_senders::<T, _, _>(|senders| senders.0.remove(self.id));
    }
}

impl<T: Sync + Send + Clone + 'static, U: Sync + Send + Clone + 'static> Stream for BrokerStream<T, U> {
    type Item = U;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.rx.poll_next_unpin(cx) {
            Poll::Ready(Some(msg)) => {
                let fut = (self.mapper)(msg);
                let mut fut = Box::pin(fut);

                fut.as_mut().poll(cx)
            }
            Poll::Ready(None) => Poll::Ready(None),
            Poll::Pending => Poll::Pending,
        }
    }
}

pub struct SimpleBroker<T>(PhantomData<T>);

impl<T: Sync + Send + Clone + 'static> SimpleBroker<T> {
    /// Publish a message that all subscription streams can receive.
    pub fn publish(msg: T) {
        with_senders::<T, _, _>(|senders| {
            for (_, sender) in senders.0.iter_mut() {
                sender.start_send(msg.clone()).ok();
            }
        });
    }

    /// Subscribe to the message of the specified type and returns a `Stream`.
    ///
    /// The `mapper` closure maps the incoming messages of type `T` to outgoing messages of type `U`.
    ///
    /// If the closure returns `None`, the stream item will be skipped.
    pub fn subscribe<U, F>(mapper: F) -> impl Stream<Item = U>
    where
        T: Sync + Send + Clone + 'static,
        U: Sync + Send + Clone + 'static,
        F: Fn(T) -> Pin<Box<dyn Future<Output = Option<U>> + Send>> + Send + Sync + 'static,
    {
        with_senders::<T, _, _>(|senders| {
            let (tx, rx) = mpsc::unbounded();
            let id = senders.0.insert(tx);

            let stream = BrokerStream {
                id,
                rx,
                mapper: Box::new(move |msg| {
                    let future = mapper(msg);
                    async move { future.await }.boxed()
                }),
            };

            stream.filter_map(|item| ready(Some(item)))
        })
    }
}

I deliver messages to it like this:

#[derive(Clone, Debug)]
pub struct MessagesSubscriptionEvent(i64);

SimpleBroker::publish(MessagesSubscriptionEvent(current_user.id));

The code to filter and transform the events looks like this:

SimpleBroker::<MessagesSubscriptionEvent>::subscribe(move |event| {
            let db = db.clone();

            Box::pin(async move {
                if event.0 == current_user.id {
                    messages::count_messages(current_user.id, None, &db).await.ok()
                } else {
                    None
                }
            })
        })

It looks like the inner await never gets resolved, and I don't get why. What am I doing wrong?

Best regards,
CK

is count_messages() doing some kind of IO? if not, what do you mean by resolve? or do you actually mean the future never got poll-ed (instead of "resvole"-d)?

if you you mean the future is not getting polled, probably bacause the "outer" future is not getting polled at all (e.g. the task (top level future) is not spawned in an executor). your publish() function uses the non_blocking start_send() API, by itself, the sender will not dispatch the message to the receivers, it simply put the data into some kind of queue. if the receiver is not being polled, there's no registered waker for the sender to call wake() on.

I meant „polled“, sorry – in JS a promise gets resolved, in Rust a future gets polled, I always confuse the terms.

count_messages uses sea-orm to query the database about the count of read and unread messages for the specified user.

The messages get consumed: when adding dbg! statements in the mapper function, I see that the code reaches the messages::count_messages function call but never seems to return: dbg!(messages::count_messages(current_user.id, None, &db).await.ok()) never outputs anything. But a dbg!(event.0 == current_user.id) directly above the if statement correctly outputs true.

To be more specific: it also reaches the code in the count_messages function until it actually tries to query the database:

    dbg!("before query");
    let rows = query.into_model::<CountQueryRow>().all(db).await?;
    dbg!("after query");

before query appears, but after query doesn't.

Edit: What I wanted to say: so the outer future must be polled, or am I wrong?

in such cases, resolve is the correct term. a future is "resolved", means its poll method returns Poll::Ready.

I see you use the ? operator there, did you check whether the operation indeed never finishes, or it just short circuited because it finishes with Err?

if it indeed never finishes, I assume the reactor is not doing its thing. I'm not familiar with sea orm, do they have some initialization API that returns some kind of async Task that you need to spawn on an executor (and you forgot to)?

1 Like

sea-orm is correctly initialized, I can do database queries and get the expected results in other contexts.

This is an axum project, so it uses tokio, and I annotated my main function with #[tokio::main], which, as far as I undestand, should be enough?

Ok, I think I got the problem. It might be the case that the future just had not enough time to be executed and got dropped before it was resolved. When I change the implementation to this:

struct BrokerStream<T: Sync + Send + Clone + 'static + Debug, U: Sync + Send + Clone + 'static + Debug> {
    id: usize,
    rx: UnboundedReceiver<T>,
    mapper: Box<Mapper<T, U>>,
    current_future: Option<Pin<Box<dyn Future<Output = Option<U>> + Send>>>,
}

impl<T: Sync + Send + Clone + 'static + Debug, U: Sync + Send + Clone + 'static + Debug> Stream for BrokerStream<T, U> {
    type Item = U;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            if let Some(future) = self.current_future.as_mut() {
                match future.as_mut().poll(cx) {
                    Poll::Ready(Some(item)) => {
                        self.current_future = None;
                        return Poll::Ready(Some(item));
                    }
                    Poll::Ready(None) => {
                        self.current_future = None;
                    }
                    Poll::Pending => {
                        return Poll::Pending;
                    }
                }
            } else {
                match self.rx.poll_next_unpin(cx) {
                    Poll::Ready(Some(msg)) => {
                        let fut = (self.mapper)(msg);
                        self.current_future = Some(fut);
                    }
                    Poll::Ready(None) => return Poll::Ready(None),
                    Poll::Pending => return Poll::Pending,
                }
            }
        }
    }
}

It works.

ah, I see, that makes sense. in your original code, the mapper is called just once when the channel receiver yields an item, and the future returned by the mapper is polled exactly once and then is dropped, which essentially starts the IO operation then cancels it immediately.

it's always tricky to write a poll method correctly, that's why we need the async/await sugar and let the compiler implement poll for us. unfortunately, we don't (yet?) have the compiler do the transform on Stream. but we can at least use some of the stream combinators to make it (hopefully) less error-prone. for example. use the then combinator from the futures-util crate or the futures-lite crate, the subscribe function can be written as following (notice how you can get rid of the BrokerStream, as well as some of the indirections!):

    pub fn subscribe<U, F, Fut>(mapper: F) -> impl Stream<Item = U>
    where
        T: Sync + Send + Clone + 'static,
        U: Sync + Send + Clone + 'static,
        F: Fn(T) -> Fut + Send + Sync + 'static,
        Fut: Future<Output = Option<U>> + Send,
    {
        with_senders::<T, _, _>(move |senders| {
            let (tx, rx) = mpsc::unbounded();
            let _id = senders.0.insert(tx);

            // your original implementation implements an early stop mechanism
            // when the mapper function returns `None`, the whole stream will stop
            // this is emulated using the `take_while` combinator
            // note that `take_while` from `futures-util` and `futures-lite` are slightly
            // different, in that the `futures-util` one expects the predictor is async
            rx
                .then(mapper)
                .take_while(|item: &Option<U>| async { item.is_some() })
                .map(|item: Option<U>| item.unwrap())

            // just a side note: the original (problematic) `poll_next()` can be emulated
            // using the `futures_lite::poll_once` combinator, futures_util deosn't have this though
            // see https://docs.rs/futures-lite/latest/futures_lite/future/fn.poll_once.html
            /*
            rx
                .then(move |item: T| {
                    let fut = mapper(item);
                    async {
                        match poll_once(fut).await {
                            Some(resvoled) => resolved,
                            None => None,
                        }
                    }
                })
                .take_while(|item: &Option<U>| async { item.is_some() })
                .map(|item: Option<U>| item.unwrap())
            */
        })
    }

so the take away is: use combinators and avoid manually implementing Stream (and Future, for that matter) whenever possible.

Very interesting. Thank you for the insights. Based on your suggestions I came up with this code, what do you think?

use std::{
    any::{Any, TypeId},
    collections::HashMap,
    fmt::Debug,
    marker::PhantomData,
    pin::Pin,
    sync::Mutex,
};

use futures::{
    channel::mpsc::{self, UnboundedSender},
    future::ready,
    stream::{Stream, StreamExt},
    Future,
};
use once_cell::sync::Lazy;
use slab::Slab;

static SUBSCRIBERS: Lazy<Mutex<HashMap<TypeId, Box<dyn Any + Send>>>> = Lazy::new(Default::default);

struct Senders<T>(Slab<UnboundedSender<T>>);

fn with_senders<T, F, R>(f: F) -> R
where
    T: Sync + Send + Clone + 'static,
    F: FnOnce(&mut Senders<T>) -> R,
{
    let mut map = SUBSCRIBERS.lock().unwrap();
    let senders = map
        .entry(TypeId::of::<Senders<T>>())
        .or_insert_with(|| Box::new(Senders::<T>(Default::default())));
    f(senders.downcast_mut::<Senders<T>>().unwrap())
}

struct OnDrop<T: Sync + Send + Clone + 'static> {
    id: usize,
    _marker: PhantomData<T>,
}

impl<T: Sync + Send + Clone + 'static> Drop for OnDrop<T> {
    fn drop(&mut self) {
        with_senders::<T, _, _>(|senders| {
            senders.0.remove(self.id);
        });
    }
}

pub struct SimpleBroker<T>(PhantomData<T>);

impl<T: Sync + Send + Clone + 'static> SimpleBroker<T> {
    pub fn publish(msg: T) {
        with_senders::<T, _, _>(|senders| {
            for (_, sender) in senders.0.iter_mut() {
                sender.start_send(msg.clone()).ok();
            }
        });
    }

    pub fn subscribe<U, F>(mapper: F) -> impl Stream<Item = U>
    where
        T: Sync + Send + Clone + 'static + Debug,
        U: Sync + Send + Clone + 'static + Debug,
        F: Fn(T) -> Pin<Box<dyn Future<Output = Option<U>> + Send>> + Send + Sync + 'static,
    {
        with_senders::<T, _, _>(|senders| {
            let (tx, rx) = mpsc::unbounded();
            let id = senders.0.insert(tx);

            let on_drop = OnDrop::<T> {
                id,
                _marker: PhantomData,
            };

            let rx = rx.then(mapper).filter_map(ready);

            // Use `rx.fuse()` to ensure that the stream is fused,
            // and use `rx.map()` to ensure that the OnDrop struct is kept alive
            rx.fuse().map(move |item| {
                let _ = &on_drop;
                item
            })
        })
    }
}

I wasn't able to get rid of a custom destructor, so I created a dummy struct and implemeted Drop on it to ensure that the subscriber gets removed.

glad to know it works for you. as for the destructor, it's totally fine and actually quite common to roll your own guard type; just FYI, there's a crate called scopeguard specially for such use cases, despite the"scope" in its name, it is actually just a generic RAII wrapper type which let users define ad-hoc destructors. the same code can be written using scopeguard like so:

    let (tx, rx) = mpsc::unbounded();
    let id = senders.0.insert(tx);
    // if the `id` is actually used somewhere, you can simply replace it
    // with a "guarded" id, so you don't need to purposely move it via
    // a dummy capture:
    /*
    let id = scopeguard::guard(id, |id| {
        with_senders::<T, _, _>(|senders| {
            senders.0.remove(id);
        });
    });
    */
    // since the id is not used elsewhere, alternatively, you can just capture `id`
    // into the `drop_fn` closure, and using It to guard some other values that
    // that you used anyway, e.g. the mapper funciton
    
    let mapper = sopeguard::guard(mapper, move |_mapper| {
        with_senders::<T, _, _>(|senders| {
            senders.0.remove(id);
        });        
    });

    // ScopeGuard implements `Deref`, but it can't possibly impl every generic traits
    // the guarded inner type impelents, so we manually move it into a closure and
    // deref it to get the inner mapper
    let rx = rx
        .then(move |item| {
            use core::ops::Deref;
            mapper.deref()(item)
            // or use the deref operator syntax
            /*
            (*mapper)(item)
            */
        })
        .filter_map(ready);

    // Use `rx.fuse()` to ensure that the stream is fused,
    rx.fuse()

and as an added bonus, you can avoid the heap allocation for Pin<Box<dyn Future>> and just use a generic Future bounds, as you no longer need to poll it manually, the combinator library will take care of the pin projection for you. (that is, unless you have specific requirement to erase the type)

1 Like

really neat! From a very complicated and long solution to a concise and much easier to understand solution. I was indeed able to remove the Pin<Box<dyn Future<...>>>. This is what makes programming real fun. Thank you very much!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.