Subscribe and listen to dynamic number of `tokio::sync::broadcast` channels?

I need to subscribe and listen to a dynamic list of Tokio broadcast channels. Normally, I could just use a select! macro to do this. However, this list of channels is dynamic and can change at runtime, so the select! macro is not an option.

Two options I've thought of, neither seem ideal

  1. Combine multiple channels into a single stream using the futures::stream::select_all function. But this means if the list of channel changes the slightest, I have to call futures::stream::select_all again and re-create the single stream. Not sure if this is an expensive operation, I'm guessing it's not cheap.

  2. Spawning a separate Tokio task for each channel I'm subscribed to. Add or drop tasks as my subscription changes. Same concern, because I may have hundreds of channels I need to subscribe to, I'm worried about performance.

Is there a canonical way (or crate) to achieve this? Any advice would be appreciated.


For more context, I have a gRPC server that is establishing bi-direction streams with the clients (in the thousands). I need to do this for each stream/client. That is, each stream needs to subscribe to a dynamically changing list of broadcast channels potentially in the hundreds.

all of these fancy combinators are just taking a list of futures and polling them in accordance with the documentation of Future and Waker

while rust has a focus on zero-cost abstractions, if existing abstractions aren't performant enough for your usecase, usually your only options are is to bypass the abstraction or create your own abstraction.

100k tasks isn't too out there. You could probably do better, but it's a reasonable place to start: internally spawning a task is just a more optimized version of adding a type erased future to a list that gets checked when the waker says something's changed, so it's probably more efficient than your attempt to do the same.

If you want to beat spawn(), you have to take advantage of application knowledge; either that you can pack the tasks better by knowing their type, or what order they should be polled, etc... potentially a lot of work!

No you don't have to do that. The return type has a push method for adding additional streams.

I don't think this is very different from using futures::stream::select_all performance-wise. Depending on how you do it, using Tokio tasks could involve fewer allocations because SelectAll will allocate every time it returns a message from a receiver, whereas you only call tokio::spawn once per channel even if there are many messages. Even if you forward them to a central place with tokio::sync::mpsc, that channel only allocates once per 32 messages.

I just found out about tokio_stream::StreamMap just now, which supports both insert and delete. Do you think it'll have better performance compared to spawning tokio tasks?

That one says it's a Vec internally, so the performance won't be anything special. I'd probably just do tokio::spawn and send stuff through a channel. Or put them all in FuturesUnordered plus a channel. Just make sure you're polling both.

I'm not that good at Rust yet, could you give me a hint as to what that means?

Yeah, here's that in code

use futures::prelude::*;
use tokio::{select, sync::mpsc};

pub async fn listen_to_many<S, T>(futures: Vec<S>)
where
    S: Stream<Item = T>,
{
    let (sender, mut receiver) = mpsc::channel(100);
    // make it a reference so that it is not moved into the async block below
    let sender = &sender;
    let unordered: stream::FuturesUnordered<_> = futures
        .into_iter()
        .enumerate()
        .map(|(i, s)| async move {
            let mut s = std::pin::pin!(s);
            while let Some(t) = s.next().await {
                sender.send((i, t)).await.expect("receiver was dropped");
            }
        })
        .collect();

    select! {
        _ = unordered.next() => (), // ensures the streams are polled
        recv = receiver.recv() => {
            let (i, t) = recv.expect("sender was dropped (impossible)");
            process(i, t)
        }
    }
}

fn process<T>(_i: usize, _t: T) {
    todo!();
}
2 Likes

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.