Receiving updates from various streams concurrently in async tokio runtime

I am trying to subscribe to various streaming type services, typically from a gRPC source, and would like to get some validation of the direction I am heading in, or some guidance if there is a better way to approach this.

Below I have a simplified example of what my program is doing. Basically I subscribe to multiple streams, then create a mspc channel for each. Then I receive incoming messages from these multiple streams concurrently by using tokio select. Each incoming message from each stream needs to be processed, which is handled by an async function that I spawn as a new task for each incoming message such that I don't block the thread too long for receiving the next message.

EDIT: There is shared state between the tasks that are spawned from the various receivers, which is why I am using the select! setup.

async fn subscribe(buffer_size: usize, client: Client) -> Result<Receiver<Notification>, Box<dyn Error>> {
    let (sender, receiver) = channel(buffer_size);
        
    let mut stream = client.subscribe().await?;
        
    std::thread::spawn(move || {
        let runtime = Builder::new_multi_thread()
            .enable_all()
            .build()
            .expect("Creating Tokio runtime");

        runtime.block_on(async move {
            loop {
                let maybe_notification = stream.try_next().await;
                match maybe_notification {
                    Ok(Some(notification)) => {
                        if sender.send(notification).await.is_err() {
                            return;
                        }
                    }
                    _ => {
                        break;
                    }
                }
            }
        });
    });
    Ok(receiver)
}

fn main() -> Result<()> {
    let runtime = Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap();
    runtime.block_on(async move {
        let client_1 = Client1::new();
        let client_2 = Client2::new();
        let client_3 = Client3::new();
        let receiver_1 = subscribe(100, client_1).await?;
        let receiver_2 = subscribe(100, client_2).await?;
        let receiver_3 = subscribe(100, client_3).await?;

        loop {
            tokio::select! {
                notification = receiver_1.recv() => {
                    tokio::spawn(process_message_1(notification));
                }
                notification = receiver_2.recv() => {
                    tokio::spawn(process_message_2(notification));
                }
                notification = receiver_3.recv() => {
                    tokio::spawn(process_message_3(notification));
                }
            }
        }
    });
    Ok(())
}

This generally functions well when the incoming volume of messages isn't too high, but when there are peaks in activity I notice that it doesn't hold up with the throughput required, and as such I fall behind the stream momentarily. So a few general questions:

  1. Is this the best method to approach such a situation? Or is there a way that instead of having one worker to handle incoming messages from all receivers (i.e. the tokio select implementation), that I could spawn dedicated threads to handle message receiving from each individual stream/channel?
  2. Are there any potential improvements to implement especially to improve throughput so as to reduce the latency for receiving incoming updates overall?

As an aside, I notice sometimes that it takes much longer than expected to spawn a task (3-10ms say), which is enough in itself to make me fall behind. Is there a better way to process the messages than simply spawn a new task for each incoming one?

Why the use of thread spawn and new builder instead of just being part of the async function?
Why not spawn one tokio task for each; client_X, code inside subscribe, loop { spawn process_message_X }. And so have no need for channel.

Is this the best method to approach such a situation? Or is there a way that instead of having one worker to handle incoming messages from all receivers (i.e. the tokio select implementation), that I could spawn dedicated threads to handle message receiving from each individual stream/channel?

As written in this example, there is no reason not to have a task to process each stream. The only reason to use select! to bring things into one task is if the messages need to all interact with some state owned (or at least managed) by the select!ing task, but in your example there is no such state. But perhaps your real problem has some such state?

Also, it might be less efficient to spawn a task for each message than to have the originally receiving task process the message. Whether that's so will depend on the actual costs of processing each message — whether the multi-threading helps more than spawning hurts.

1 Like

But perhaps your real problem has some such state?

Yeah there is shared state across the various processes, in that case is select! the best option?

it might be less efficient to spawn a task for each message than to have the originally receiving task process the message

The spawned tasks can take from anywhere between 100 microseconds to up to a few hundred milliseconds. (Getting this processing time down via optimizations is a separate thing I am working but thought I would isolate the message ingestion throughput piece in this thread).

I was using thread spawn as after doing some reading I came across several recommendations to run backrun tasks that never end in a dedicated thread (i.e. : Async: What is blocking? – Alice Ryhl).

One thing I didn't include in the original post that I have now added is that there is shared state between the tasks run from the messages received from the various streams. With that being the case, would your suggestion to remove the channels still be applicable? And just send a Mutex of the shared state to each subscribe call or something like that?

Alice's suggestion applies to the case where your operation is blocking (in other words, you don't use a Tokio runtime).

In your case, since you use async throughout the task, you'd be better off using tokio::spawn to fire off async tasks, and letting Tokio manage threads for you:

async fn subscribe(buffer_size: usize, client: Client) -> Result<Receiver<Notification>, Box<dyn Error>> {
    let (sender, receiver) = channel(buffer_size);
        
    let mut stream = client.subscribe().await?;
        
    tokio::spawn(async move {
        loop {
            let maybe_notification = stream.try_next().await;
            match maybe_notification {
                Ok(Some(notification)) => {
                    if sender.send(notification).await.is_err() {
                        return;
                    }
                }
                _ => {
                    break;
                }
            }
        }
    });

    Ok(receiver)
}

Ok noted thanks that's an easy one to benchmark, I'll implement that and see how it goes.

My suspicion is that there is still some blocking occurring in the tokio select part of the code which is impacting the throughput for which I can receive incoming messages.

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.