Tokio with actor model: Lifetime issue with protobuf deserialization and subsequent send on channel to actor

So I have a websocket sending protobuf messages down the pipe which I want to deserialize and then route down an appropriate channel to various actors. This is an application where message processing speed/throughput is a priority. I chose quick-protobuf for this reason, it uses Cow<&str> to try to stay zero copy on deserialization. I am now needing to deal with those lifetimes which come along with avoiding those allocations.

I am using tokio-tungensite for the websocket client. I think I have a few options here:

  1. Just punt on the zero copy idea and try prost or something. Mildly averse to this since I've invested a bit into quick-protobuf but maybe it was the wrong choice here -- was it pre-mature optimization?
  2. Let the actor do the deserialization instead of the reader, and just send the tungensite message down the channel, along with its template_id. I think this should avoid the lifetime issue since the downstream actor is the one reading the message and updating it's own state based on the reading of the message.
  3. Fiqure out the lifetimes -- not something I understand well enough and therefore unsure how long it may take me to figure it out.

I am wondering about option 2's impact on performance in a case of a lot of queued up messages (I think the upper end could be 100,000 small msgs per second, but likely much less on average) on the websocket reader, based on the extra await calls on the channel, instead of just having the reader deserialize it right away when it comes down the pipe. I am guessing it should be fine given the thread pool and each actor being it's own task. Is this a safe assumption?

Here is a snippet of the code (showing the reader doing the deserialization as soon as it comes in) to get the idea across, I would appreciate any insights from you all.



pub struct PBReader<'a> {
    socket_reader: WS_Stream,
    data_sender: mpsc::Sender<DataMsg<'a>>,
    other_sender: broadcast::Sender<Message>,
}

enum DataMsg<'a> {
    Update1(ProtoBuffUpdate1<'a>),
    Update2(ProtoBuffUpdate2<'a>),
    Update3(ProtoBuffUpdate3<'a>),
    Update4(ProtoBuffUpdate4<'a>),
}

    
impl PBReader {    
    pub fn new(
        socket_reader: WS_Stream,
        data_sender: mpsc::Sender<DataMsg>,
        other_sender: broadcast::Sender<Message>,
    ) -> Self {
        Self {
            data_sender,
            other_sender,
            socket_reader,
        }
    }

    fn handle_message(&self, msg: Message) {
        let data = msg.into_data();
        match get_template_id(&data) {
            Some(19) => println!("...heartbeat received..."),
            Some(13) => println!("Logout response!"),
            Some(101) => {
                let msg = response_from_bytes::<ProtoBuffUpdate1>(bytes);
                self.data_sender.send(DataMsg::Update1(msg));
            }
            Some(151) => {
                let msg = response_from_bytes::<ProtoBuffUpdate2>(bytes);
                self.data_sender.send(DataMsg::Update2(msg));
            }
            Some(150) => {
                let msg = response_from_bytes::<ProtoBuffUpdate3>(bytes);
                self.data_sender.send(DataMsg::Update3(msg));
            }
            Some(207) => {
                let msg = response_from_bytes::<ProtoBuffUpdate4>(bytes);
                self.data_sender.send(DataMsg::Update4(msg));
            }
            Some(x) => println!("Unhandled msg id {x}"),
            None => println!("Could not read template id {bytes:?}"),
        }
    }
}

pub(crate) fn response_from_bytes<'a, T: MessageRead<'a>>(bytes: &'a [u8]) -> T {
    let mut reader = BytesReader::from_bytes(&bytes);
    T::from_reader(&mut reader, bytes).expect("Parse Error!")
}

pub(crate) async fn run_reader(mut reader: PBReader) -> Result<()> {
    while let Ok(msg) = reader
        .socket_reader
        .next()
        .await
        .ok_or(RError::Message("websocket read error!".into()))?
    {
        reader.handle_message(msg);
    }
    Ok(())
}

You can't send any types with lifetimes annotated on them through an actor channel.

1 Like

Okay got it. I ended up just wrapping the DataMsg enums around Vec<u8> which is what Tungensite::Message::into_data() returns. Then the receiving actor will match on the enum and deserialize to the appropriate type. It seemed to work, I suppose this is just moving where the "backpressure" ends up as the reader can forward things more quickly if just forwarding bytes, maybe this is actually better.

Thanks Alice!

This topic was automatically closed 90 days after the last reply. We invite you to open a new topic if you have further questions or comments.