So I have a websocket sending protobuf messages down the pipe which I want to deserialize and then route down an appropriate channel to various actors. This is an application where message processing speed/throughput is a priority. I chose quick-protobuf
for this reason, it uses Cow<&str>
to try to stay zero copy on deserialization. I am now needing to deal with those lifetimes which come along with avoiding those allocations.
I am using tokio-tungensite
for the websocket client. I think I have a few options here:
- Just punt on the zero copy idea and try
prost
or something. Mildly averse to this since I've invested a bit intoquick-protobuf
but maybe it was the wrong choice here -- was it pre-mature optimization? - Let the actor do the deserialization instead of the reader, and just send the tungensite message down the channel, along with its
template_id
. I think this should avoid the lifetime issue since the downstream actor is the one reading the message and updating it's own state based on the reading of the message. - Fiqure out the lifetimes -- not something I understand well enough and therefore unsure how long it may take me to figure it out.
I am wondering about option 2's impact on performance in a case of a lot of queued up messages (I think the upper end could be 100,000 small msgs per second, but likely much less on average) on the websocket reader, based on the extra await
calls on the channel, instead of just having the reader deserialize it right away when it comes down the pipe. I am guessing it should be fine given the thread pool and each actor being it's own task. Is this a safe assumption?
Here is a snippet of the code (showing the reader doing the deserialization as soon as it comes in) to get the idea across, I would appreciate any insights from you all.
pub struct PBReader<'a> {
socket_reader: WS_Stream,
data_sender: mpsc::Sender<DataMsg<'a>>,
other_sender: broadcast::Sender<Message>,
}
enum DataMsg<'a> {
Update1(ProtoBuffUpdate1<'a>),
Update2(ProtoBuffUpdate2<'a>),
Update3(ProtoBuffUpdate3<'a>),
Update4(ProtoBuffUpdate4<'a>),
}
impl PBReader {
pub fn new(
socket_reader: WS_Stream,
data_sender: mpsc::Sender<DataMsg>,
other_sender: broadcast::Sender<Message>,
) -> Self {
Self {
data_sender,
other_sender,
socket_reader,
}
}
fn handle_message(&self, msg: Message) {
let data = msg.into_data();
match get_template_id(&data) {
Some(19) => println!("...heartbeat received..."),
Some(13) => println!("Logout response!"),
Some(101) => {
let msg = response_from_bytes::<ProtoBuffUpdate1>(bytes);
self.data_sender.send(DataMsg::Update1(msg));
}
Some(151) => {
let msg = response_from_bytes::<ProtoBuffUpdate2>(bytes);
self.data_sender.send(DataMsg::Update2(msg));
}
Some(150) => {
let msg = response_from_bytes::<ProtoBuffUpdate3>(bytes);
self.data_sender.send(DataMsg::Update3(msg));
}
Some(207) => {
let msg = response_from_bytes::<ProtoBuffUpdate4>(bytes);
self.data_sender.send(DataMsg::Update4(msg));
}
Some(x) => println!("Unhandled msg id {x}"),
None => println!("Could not read template id {bytes:?}"),
}
}
}
pub(crate) fn response_from_bytes<'a, T: MessageRead<'a>>(bytes: &'a [u8]) -> T {
let mut reader = BytesReader::from_bytes(&bytes);
T::from_reader(&mut reader, bytes).expect("Parse Error!")
}
pub(crate) async fn run_reader(mut reader: PBReader) -> Result<()> {
while let Ok(msg) = reader
.socket_reader
.next()
.await
.ok_or(RError::Message("websocket read error!".into()))?
{
reader.handle_message(msg);
}
Ok(())
}