I am trying to subscribe to various streaming type services, typically from a gRPC source, and would like to get some validation of the direction I am heading in, or some guidance if there is a better way to approach this.
Below I have a simplified example of what my program is doing. Basically I subscribe to multiple streams, then create a mspc channel for each. Then I receive incoming messages from these multiple streams concurrently by using tokio select. Each incoming message from each stream needs to be processed, which is handled by an async function that I spawn as a new task for each incoming message such that I don't block the thread too long for receiving the next message.
EDIT: There is shared state between the tasks that are spawned from the various receivers, which is why I am using the select!
setup.
async fn subscribe(buffer_size: usize, client: Client) -> Result<Receiver<Notification>, Box<dyn Error>> {
let (sender, receiver) = channel(buffer_size);
let mut stream = client.subscribe().await?;
std::thread::spawn(move || {
let runtime = Builder::new_multi_thread()
.enable_all()
.build()
.expect("Creating Tokio runtime");
runtime.block_on(async move {
loop {
let maybe_notification = stream.try_next().await;
match maybe_notification {
Ok(Some(notification)) => {
if sender.send(notification).await.is_err() {
return;
}
}
_ => {
break;
}
}
}
});
});
Ok(receiver)
}
fn main() -> Result<()> {
let runtime = Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
let client_1 = Client1::new();
let client_2 = Client2::new();
let client_3 = Client3::new();
let receiver_1 = subscribe(100, client_1).await?;
let receiver_2 = subscribe(100, client_2).await?;
let receiver_3 = subscribe(100, client_3).await?;
loop {
tokio::select! {
notification = receiver_1.recv() => {
tokio::spawn(process_message_1(notification));
}
notification = receiver_2.recv() => {
tokio::spawn(process_message_2(notification));
}
notification = receiver_3.recv() => {
tokio::spawn(process_message_3(notification));
}
}
}
});
Ok(())
}
This generally functions well when the incoming volume of messages isn't too high, but when there are peaks in activity I notice that it doesn't hold up with the throughput required, and as such I fall behind the stream momentarily. So a few general questions:
- Is this the best method to approach such a situation? Or is there a way that instead of having one worker to handle incoming messages from all receivers (i.e. the tokio select implementation), that I could spawn dedicated threads to handle message receiving from each individual stream/channel?
- Are there any potential improvements to implement especially to improve throughput so as to reduce the latency for receiving incoming updates overall?
As an aside, I notice sometimes that it takes much longer than expected to spawn a task (3-10ms say), which is enough in itself to make me fall behind. Is there a better way to process the messages than simply spawn a new task for each incoming one?